diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs index cc319ed3436e..c8e8edee4214 100644 --- a/core/src/services/gcs/backend.rs +++ b/core/src/services/gcs/backend.rs @@ -41,7 +41,8 @@ use crate::*; const DEFAULT_GCS_ENDPOINT: &str = "https://storage.googleapis.com"; const DEFAULT_GCS_SCOPE: &str = "https://www.googleapis.com/auth/devstorage.read_write"; - +/// It's recommended that you use at least 8 MiB for the chunk size. +const DEFAULT_WRITE_FIXED_SIZE: usize = 8 * 1024 * 1024; /// Google Cloud Storage service. /// /// # Capabilities @@ -120,6 +121,9 @@ pub struct GcsBuilder { customed_token_loader: Option>, predefined_acl: Option, default_storage_class: Option, + + /// the fixed size writer uses to flush into underlying storage. + write_fixed_size: Option, } impl GcsBuilder { @@ -237,6 +241,16 @@ impl GcsBuilder { }; self } + + /// The buffer size should be a multiple of 256 KiB (256 x 1024 bytes), unless it's the last chunk that completes the upload. + /// Larger chunk sizes typically make uploads faster, but note that there's a tradeoff between speed and memory usage. + /// It's recommended that you use at least 8 MiB for the chunk size. + /// Reference: [Perform resumable uploads](https://cloud.google.com/storage/docs/performing-resumable-uploads) + pub fn write_fixed_size(&mut self, fixed_buffer_size: usize) -> &mut Self { + self.write_fixed_size = Some(fixed_buffer_size); + + self + } } impl Debug for GcsBuilder { @@ -336,6 +350,15 @@ impl Builder for GcsBuilder { let signer = GoogleSigner::new("storage"); + let write_fixed_size = self.write_fixed_size.unwrap_or(DEFAULT_WRITE_FIXED_SIZE); + if write_fixed_size.checked_rem_euclid(256 * 1024).is_some() { + return Err(Error::new( + ErrorKind::ConfigInvalid, + "The write fixed buffer size is misconfigured", + ) + .with_context("service", Scheme::Gcs)); + } + let backend = GcsBackend { core: Arc::new(GcsCore { endpoint, @@ -347,6 +370,7 @@ impl Builder for GcsBuilder { credential_loader: cred_loader, predefined_acl: self.predefined_acl.clone(), default_storage_class: self.default_storage_class.clone(), + write_fixed_size, }), }; diff --git a/core/src/services/gcs/core.rs b/core/src/services/gcs/core.rs index b4b4278cc486..5ac9dc207681 100644 --- a/core/src/services/gcs/core.rs +++ b/core/src/services/gcs/core.rs @@ -54,6 +54,8 @@ pub struct GcsCore { pub predefined_acl: Option, pub default_storage_class: Option, + + pub write_fixed_size: usize, } impl Debug for GcsCore { diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs index c4ed1f6e0449..e853de83b2b5 100644 --- a/core/src/services/gcs/writer.rs +++ b/core/src/services/gcs/writer.rs @@ -35,11 +35,12 @@ pub struct GcsWriter { location: Option, written: u64, buffer: oio::VectorCursor, - buffer_size: usize, + write_fixed_size: usize, } impl GcsWriter { pub fn new(core: Arc, path: &str, op: OpWrite) -> Self { + let write_fixed_size = core.write_fixed_size; GcsWriter { core, path: path.to_string(), @@ -48,17 +49,7 @@ impl GcsWriter { location: None, written: 0, buffer: oio::VectorCursor::new(), - // The chunk size should be a multiple of 256 KiB - // (256 x 1024 bytes), unless it's the last chunk - // that completes the upload. - // - // Larger chunk sizes typically make uploads faster, - // but note that there's a tradeoff between speed and - // memory usage. It's recommended that you use at least - // 8 MiB for the chunk size. - // - // TODO: allow this value to be configured. - buffer_size: 8 * 1024 * 1024, + write_fixed_size, } } @@ -151,16 +142,16 @@ impl oio::Write for GcsWriter { self.buffer.push(bs); // Return directly if the buffer is not full - if self.buffer.len() <= self.buffer_size { + if self.buffer.len() <= self.write_fixed_size { return Ok(()); } - let bs = self.buffer.peak_exact(self.buffer_size); + let bs = self.buffer.peak_exact(self.write_fixed_size); match self.write_part(location, bs).await { Ok(_) => { - self.buffer.take(self.buffer_size); - self.written += self.buffer_size as u64; + self.buffer.take(self.write_fixed_size); + self.written += self.write_fixed_size as u64; Ok(()) } Err(e) => { diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index 189ec2daa92c..1f42d0766f95 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -38,6 +38,7 @@ use crate::ops::*; use crate::raw::*; use crate::*; +const DEFAULT_WRITE_MIN_SIZE: usize = 8 * 1024 * 1024; /// Aliyun Object Storage Service (OSS) support /// /// # Capabilities @@ -123,6 +124,8 @@ pub struct OssBuilder { access_key_secret: Option, http_client: Option, + /// the size of each part, and the range is 5MB ~ 5 GB. + write_min_size: Option, } impl Debug for OssBuilder { @@ -292,6 +295,14 @@ impl OssBuilder { } self } + + /// set the minimum size of unsized write, it should be greater than 5 MB. + /// Reference: [OSS Multipart upload](https://www.alibabacloud.com/help/en/object-storage-service/latest/multipart-upload-6) + pub fn write_min_size(&mut self, write_min_size: usize) -> &mut Self { + self.write_min_size = Some(write_min_size); + + self + } } impl Builder for OssBuilder { @@ -313,6 +324,8 @@ impl Builder for OssBuilder { .map(|v| builder.server_side_encryption(v)); map.get("server_side_encryption_key_id") .map(|v| builder.server_side_encryption_key_id(v)); + map.get("write_min_size") + .map(|v| builder.write_min_size(v.parse::().unwrap())); builder } @@ -384,6 +397,14 @@ impl Builder for OssBuilder { let signer = AliyunOssSigner::new(bucket); + let write_min_size = self.write_min_size.unwrap_or(DEFAULT_WRITE_MIN_SIZE); + if write_min_size < 5 * 1024 * 1024 { + return Err(Error::new( + ErrorKind::ConfigInvalid, + "The write minimum buffer size is misconfigured", + ) + .with_context("service", Scheme::Oss)); + } debug!("Backend build finished"); Ok(OssBackend { @@ -398,6 +419,7 @@ impl Builder for OssBuilder { client, server_side_encryption, server_side_encryption_key_id, + write_min_size, }), }) } diff --git a/core/src/services/oss/core.rs b/core/src/services/oss/core.rs index 8a4670e7e3f6..68297dfcc492 100644 --- a/core/src/services/oss/core.rs +++ b/core/src/services/oss/core.rs @@ -65,6 +65,7 @@ pub struct OssCore { pub client: HttpClient, pub loader: AliyunLoader, pub signer: AliyunOssSigner, + pub write_min_size: usize, } impl Debug for OssCore { diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs index 3e6c49e7c3d2..9a75a4aaca6f 100644 --- a/core/src/services/oss/writer.rs +++ b/core/src/services/oss/writer.rs @@ -42,6 +42,7 @@ pub struct OssWriter { impl OssWriter { pub fn new(core: Arc, path: &str, op: OpWrite) -> Self { + let buffer_size = core.write_min_size; OssWriter { core, path: path.to_string(), @@ -50,13 +51,7 @@ impl OssWriter { upload_id: None, parts: vec![], buffer: oio::VectorCursor::new(), - // The part size must be 5 MiB to 5 GiB. There is no minimum - // size limit on the last part of your multipart upload. - // - // We pick the default value as 8 MiB for better thoughput. - // - // TODO: allow this value to be configured. - buffer_size: 8 * 1024 * 1024, + buffer_size, } } diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index acd17dc1260a..f0bc56896756 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -55,6 +55,7 @@ static ENDPOINT_TEMPLATES: Lazy> = Lazy::new m }); +const DEFAULT_WRITE_MIN_SIZE: usize = 8 * 1024 * 1024; /// Aws S3 and compatible services (including minio, digitalocean space, Tencent Cloud Object Storage(COS) and so on) support. /// For more information about s3-compatible services, refer to [Compatible Services](#compatible-services). /// @@ -305,6 +306,10 @@ pub struct S3Builder { http_client: Option, customed_credential_load: Option>, + + /// the part size of s3 multipart upload, which should be 5 MiB to 5 GiB. + /// There is no minimum size limit on the last part of your multipart upload + write_min_size: Option, } impl Debug for S3Builder { @@ -705,6 +710,14 @@ impl S3Builder { endpoint } + + /// set the minimum size of unsized write, it should be greater than 5 MB. + /// Reference: [Amazon S3 multipart upload limits](https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html) + pub fn write_min_size(&mut self, write_min_size: usize) -> &mut Self { + self.write_min_size = Some(write_min_size); + + self + } } impl Builder for S3Builder { @@ -872,6 +885,14 @@ impl Builder for S3Builder { } let signer = AwsV4Signer::new("s3", ®ion); + let write_min_size = self.write_min_size.unwrap_or(DEFAULT_WRITE_MIN_SIZE); + if write_min_size < 5 * 1024 * 1024 { + return Err(Error::new( + ErrorKind::ConfigInvalid, + "The write minimum buffer size is misconfigured", + ) + .with_context("service", Scheme::S3)); + } debug!("backend build finished"); Ok(S3Backend { @@ -888,6 +909,7 @@ impl Builder for S3Builder { signer, loader, client, + write_min_size, }), }) } diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs index eba880c07e55..e27fbbf3117c 100644 --- a/core/src/services/s3/core.rs +++ b/core/src/services/s3/core.rs @@ -86,6 +86,7 @@ pub struct S3Core { pub signer: AwsV4Signer, pub loader: AwsLoader, pub client: HttpClient, + pub write_min_size: usize, } impl Debug for S3Core { diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs index 93ee59a833c3..8898fd945ac0 100644 --- a/core/src/services/s3/writer.rs +++ b/core/src/services/s3/writer.rs @@ -42,6 +42,7 @@ pub struct S3Writer { impl S3Writer { pub fn new(core: Arc, path: &str, op: OpWrite) -> Self { + let buffer_size = core.write_min_size; S3Writer { core, path: path.to_string(), @@ -50,13 +51,7 @@ impl S3Writer { upload_id: None, parts: vec![], buffer: oio::VectorCursor::new(), - // The part size must be 5 MiB to 5 GiB. There is no minimum - // size limit on the last part of your multipart upload. - // - // We pick the default value as 8 MiB for better throughput. - // - // TODO: allow this value to be configured. - buffer_size: 8 * 1024 * 1024, + buffer_size, } }