Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(writer): configurable buffer size of unsized write #2143

Merged
merged 12 commits into from
May 6, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion core/src/services/gcs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ use crate::*;

const DEFAULT_GCS_ENDPOINT: &str = "https://storage.googleapis.com";
const DEFAULT_GCS_SCOPE: &str = "https://www.googleapis.com/auth/devstorage.read_write";

/// It's recommended that you use at least 8 MiB for the chunk size.
const DEFAULT_WRITE_FIXED_SIZE: usize = 8 * 1024 * 1024;
/// Google Cloud Storage service.
///
/// # Capabilities
Expand Down Expand Up @@ -119,6 +120,9 @@ pub struct GcsBuilder {
customed_token_loader: Option<Box<dyn GoogleTokenLoad>>,
predefined_acl: Option<String>,
default_storage_class: Option<String>,

/// the fixed size writer uses to flush into underlying storage.
write_fixed_size: Option<usize>,
}

impl GcsBuilder {
Expand Down Expand Up @@ -236,6 +240,16 @@ impl GcsBuilder {
};
self
}

/// The buffer size should be a multiple of 256 KiB (256 x 1024 bytes), unless it's the last chunk that completes the upload.
/// Larger chunk sizes typically make uploads faster, but note that there's a tradeoff between speed and memory usage.
/// It's recommended that you use at least 8 MiB for the chunk size.
/// Reference: [Perform resumable uploads](https://cloud.google.com/storage/docs/performing-resumable-uploads)
pub fn write_fixed_size(&mut self, fixed_buffer_size: usize) -> &mut Self {
self.write_fixed_size = Some(fixed_buffer_size);

self
}
}

impl Debug for GcsBuilder {
Expand Down Expand Up @@ -335,6 +349,15 @@ impl Builder for GcsBuilder {

let signer = GoogleSigner::new("storage");

let write_fixed_size = self.write_fixed_size.unwrap_or(DEFAULT_WRITE_FIXED_SIZE);
if write_fixed_size.checked_rem_euclid(256 * 1024).is_some() {
return Err(Error::new(
ErrorKind::ConfigInvalid,
"The write fixed buffer size is misconfigured",
)
.with_context("service", Scheme::Gcs));
}

let backend = GcsBackend {
core: Arc::new(GcsCore {
endpoint,
Expand All @@ -346,6 +369,7 @@ impl Builder for GcsBuilder {
credential_loader: cred_loader,
predefined_acl: self.predefined_acl.clone(),
default_storage_class: self.default_storage_class.clone(),
write_fixed_size,
}),
};

Expand Down
2 changes: 2 additions & 0 deletions core/src/services/gcs/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub struct GcsCore {

pub predefined_acl: Option<String>,
pub default_storage_class: Option<String>,

pub write_fixed_size: usize,
}

impl Debug for GcsCore {
Expand Down
23 changes: 7 additions & 16 deletions core/src/services/gcs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ pub struct GcsWriter {
location: Option<String>,
written: u64,
buffer: oio::VectorCursor,
buffer_size: usize,
write_fixed_size: usize,
}

impl GcsWriter {
pub fn new(core: Arc<GcsCore>, path: &str, op: OpWrite) -> Self {
let write_fixed_size = core.write_fixed_size;
GcsWriter {
core,
path: path.to_string(),
Expand All @@ -48,17 +49,7 @@ impl GcsWriter {
location: None,
written: 0,
buffer: oio::VectorCursor::new(),
// The chunk size should be a multiple of 256 KiB
// (256 x 1024 bytes), unless it's the last chunk
// that completes the upload.
//
// Larger chunk sizes typically make uploads faster,
// but note that there's a tradeoff between speed and
// memory usage. It's recommended that you use at least
// 8 MiB for the chunk size.
//
// TODO: allow this value to be configured.
buffer_size: 8 * 1024 * 1024,
write_fixed_size,
}
}

Expand Down Expand Up @@ -152,16 +143,16 @@ impl oio::Write for GcsWriter {

self.buffer.push(bs);
// Return directly if the buffer is not full
if self.buffer.len() <= self.buffer_size {
if self.buffer.len() <= self.write_fixed_size {
return Ok(());
}

let bs = self.buffer.peak_exact(self.buffer_size);
let bs = self.buffer.peak_exact(self.write_fixed_size);

match self.write_part(location, bs).await {
Ok(_) => {
self.buffer.take(self.buffer_size);
self.written += self.buffer_size as u64;
self.buffer.take(self.write_fixed_size);
self.written += self.write_fixed_size as u64;
Ok(())
}
Err(e) => {
Expand Down
22 changes: 22 additions & 0 deletions core/src/services/oss/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::ops::*;
use crate::raw::*;
use crate::*;

const DEFAULT_WRITE_MIN_SIZE: usize = 8 * 1024 * 1024;
/// Aliyun Object Storage Service (OSS) support
///
/// # Capabilities
Expand Down Expand Up @@ -123,6 +124,8 @@ pub struct OssBuilder {
access_key_secret: Option<String>,

http_client: Option<HttpClient>,
/// the size of each part, and the range is 5MB ~ 5 GB.
write_min_size: Option<usize>,
}

impl Debug for OssBuilder {
Expand Down Expand Up @@ -292,6 +295,14 @@ impl OssBuilder {
}
self
}

/// set the minimum size of unsized write, it should be greater than 5 MB.
/// Reference: [OSS Multipart upload](https://www.alibabacloud.com/help/en/object-storage-service/latest/multipart-upload-6)
pub fn write_min_size(&mut self, write_min_size: usize) -> &mut Self {
self.write_min_size = Some(write_min_size);

self
}
}

impl Builder for OssBuilder {
Expand All @@ -313,6 +324,8 @@ impl Builder for OssBuilder {
.map(|v| builder.server_side_encryption(v));
map.get("server_side_encryption_key_id")
.map(|v| builder.server_side_encryption_key_id(v));
map.get("write_min_size")
.map(|v| builder.write_min_size(v.parse::<usize>().unwrap()));
builder
}

Expand Down Expand Up @@ -384,6 +397,14 @@ impl Builder for OssBuilder {

let signer = AliyunOssSigner::new(bucket);

let write_min_size = self.write_min_size.unwrap_or(DEFAULT_WRITE_MIN_SIZE);
if write_min_size < 5 * 1024 * 1024 {
return Err(Error::new(
ErrorKind::ConfigInvalid,
"The write minimum buffer size is misconfigured",
)
.with_context("service", Scheme::Oss));
}
debug!("Backend build finished");

Ok(OssBackend {
Expand All @@ -398,6 +419,7 @@ impl Builder for OssBuilder {
client,
server_side_encryption,
server_side_encryption_key_id,
write_min_size,
}),
})
}
Expand Down
1 change: 1 addition & 0 deletions core/src/services/oss/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub struct OssCore {
pub client: HttpClient,
pub loader: AliyunLoader,
pub signer: AliyunOssSigner,
pub write_min_size: usize,
}

impl Debug for OssCore {
Expand Down
9 changes: 2 additions & 7 deletions core/src/services/oss/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub struct OssWriter {

impl OssWriter {
pub fn new(core: Arc<OssCore>, path: &str, op: OpWrite) -> Self {
let buffer_size = core.write_min_size;
OssWriter {
core,
path: path.to_string(),
Expand All @@ -50,13 +51,7 @@ impl OssWriter {
upload_id: None,
parts: vec![],
buffer: oio::VectorCursor::new(),
// The part size must be 5 MiB to 5 GiB. There is no minimum
// size limit on the last part of your multipart upload.
//
// We pick the default value as 8 MiB for better thoughput.
//
// TODO: allow this value to be configured.
buffer_size: 8 * 1024 * 1024,
buffer_size,
}
}

Expand Down
29 changes: 29 additions & 0 deletions core/src/services/s3/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ static ENDPOINT_TEMPLATES: Lazy<HashMap<&'static str, &'static str>> = Lazy::new
m
});

const DEFAULT_WRITE_MIN_SIZE: usize = 8 * 1024 * 1024;
/// Aws S3 and compatible services (including minio, digitalocean space, Tencent Cloud Object Storage(COS) and so on) support.
/// For more information about s3-compatible services, refer to [Compatible Services](#compatible-services).
///
Expand Down Expand Up @@ -304,6 +305,10 @@ pub struct S3Builder {

http_client: Option<HttpClient>,
customed_credential_load: Option<Box<dyn AwsCredentialLoad>>,

/// the part size of s3 multipart upload, which shoule be 5 MiB to 5 GiB.
/// There is no minimum size limit on the last part of your multipart upload
write_min_size: Option<usize>,
}

impl Debug for S3Builder {
Expand Down Expand Up @@ -704,6 +709,21 @@ impl S3Builder {

endpoint
}

/// set the minimum size of unsized write, it should be greater than 5 MB.
/// Reference: [Amazon S3 multipart upload limits](https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html)
pub fn write_min_size(&mut self, write_min_size: usize) -> Result<&mut Self> {
if write_min_size > 5 * 1024 * 1024 {
self.write_min_size = Some(write_min_size);
} else {
return Err(
Error::new(ErrorKind::ConfigInvalid, "The buffer size is misconfigured")
.with_context("service", Scheme::S3),
);
}

Ok(self)
}
}

impl Builder for S3Builder {
Expand Down Expand Up @@ -871,6 +891,14 @@ impl Builder for S3Builder {
}

let signer = AwsV4Signer::new("s3", &region);
let write_min_size = self.write_min_size.unwrap_or(DEFAULT_WRITE_MIN_SIZE);
if write_min_size < 5 * 1024 * 1024 {
return Err(Error::new(
ErrorKind::ConfigInvalid,
"The write minimum buffer size is misconfigured",
)
.with_context("service", Scheme::S3));
}

debug!("backend build finished");
Ok(S3Backend {
Expand All @@ -887,6 +915,7 @@ impl Builder for S3Builder {
signer,
loader,
client,
write_min_size,
}),
})
}
Expand Down
1 change: 1 addition & 0 deletions core/src/services/s3/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub struct S3Core {
pub signer: AwsV4Signer,
pub loader: AwsLoader,
pub client: HttpClient,
pub write_min_size: usize,
}

impl Debug for S3Core {
Expand Down
9 changes: 2 additions & 7 deletions core/src/services/s3/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub struct S3Writer {

impl S3Writer {
pub fn new(core: Arc<S3Core>, path: &str, op: OpWrite) -> Self {
let buffer_size = core.write_min_size;
S3Writer {
core,
path: path.to_string(),
Expand All @@ -50,13 +51,7 @@ impl S3Writer {
upload_id: None,
parts: vec![],
buffer: oio::VectorCursor::new(),
// The part size must be 5 MiB to 5 GiB. There is no minimum
// size limit on the last part of your multipart upload.
//
// We pick the default value as 8 MiB for better thoughput.
//
// TODO: allow this value to be configured.
buffer_size: 8 * 1024 * 1024,
buffer_size,
}
}

Expand Down