From 169094dd04eb43a66fc4b6f478cec667aeb61b96 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 11 Apr 2023 22:18:36 +0800 Subject: [PATCH] refactor(services/s3): Migrate to async reqsign (#1909) * refactor(services/s3): Migrate to async reqsign Signed-off-by: Xuanwo * Fix test Signed-off-by: Xuanwo * Fix test Signed-off-by: Xuanwo * Add retry for s3 Signed-off-by: Xuanwo * Allow anonymous Signed-off-by: Xuanwo * Allow set disable ec2 metadata Signed-off-by: Xuanwo * Fix reqsign Signed-off-by: Xuanwo --------- Signed-off-by: Xuanwo --- Cargo.lock | 4 +- bindings/nodejs/src/lib.rs | 8 +- core/Cargo.toml | 2 +- core/src/raw/http_util/header.rs | 13 + core/src/raw/http_util/mod.rs | 1 + core/src/services/gcs/core.rs | 11 +- core/src/services/oss/backend.rs | 6 +- core/src/services/s3/backend.rs | 1036 +++------------------------ core/src/services/s3/core.rs | 849 ++++++++++++++++++++++ core/src/services/s3/mod.rs | 1 + core/src/services/s3/pager.rs | 24 +- core/src/services/s3/writer.rs | 32 +- core/src/types/operator/operator.rs | 2 +- core/src/types/ops.rs | 2 +- core/tests/behavior/presign.rs | 8 +- 15 files changed, 1030 insertions(+), 969 deletions(-) create mode 100644 core/src/services/s3/core.rs diff --git a/Cargo.lock b/Cargo.lock index 14daaa20f606..6e58ead9f9e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2253,7 +2253,7 @@ dependencies = [ "rand 0.8.5", "redis", "reqsign 0.8.5 (registry+https://github.com/rust-lang/crates.io-index)", - "reqsign 0.8.5 (git+https://github.com/Xuanwo/reqsign?rev=877292a171bfec9593df27cb4ec94676e77a9d57)", + "reqsign 0.8.5 (git+https://github.com/Xuanwo/reqsign?rev=fde88af3aecf4ba6c39e5d84dc39c5200f8f3a5e)", "reqwest", "rocksdb", "serde", @@ -3202,7 +3202,7 @@ dependencies = [ [[package]] name = "reqsign" version = "0.8.5" -source = "git+https://github.com/Xuanwo/reqsign?rev=877292a171bfec9593df27cb4ec94676e77a9d57#877292a171bfec9593df27cb4ec94676e77a9d57" +source = "git+https://github.com/Xuanwo/reqsign?rev=fde88af3aecf4ba6c39e5d84dc39c5200f8f3a5e#fde88af3aecf4ba6c39e5d84dc39c5200f8f3a5e" dependencies = [ "anyhow", "async-trait", diff --git a/bindings/nodejs/src/lib.rs b/bindings/nodejs/src/lib.rs index 0c62ae34f5dd..aa14cfaa43be 100644 --- a/bindings/nodejs/src/lib.rs +++ b/bindings/nodejs/src/lib.rs @@ -20,11 +20,11 @@ extern crate napi_derive; use std::collections::HashMap; use std::str::FromStr; +use std::time::Duration; use futures::TryStreamExt; use napi::bindgen_prelude::*; use time::format_description::well_known::Rfc3339; -use time::Duration; fn build_operator( scheme: opendal::Scheme, @@ -487,7 +487,7 @@ impl Operator { pub async fn presign_read(&self, path: String, expires: u32) -> Result { let res = self .0 - .presign_read(&path, Duration::seconds(expires as i64)) + .presign_read(&path, Duration::from_secs(expires as u64)) .await .map_err(format_napi_error)?; Ok(PresignedRequest::new(res)) @@ -510,7 +510,7 @@ impl Operator { pub async fn presign_write(&self, path: String, expires: u32) -> Result { let res = self .0 - .presign_write(&path, Duration::seconds(expires as i64)) + .presign_write(&path, Duration::from_secs(expires as u64)) .await .map_err(format_napi_error)?; Ok(PresignedRequest::new(res)) @@ -533,7 +533,7 @@ impl Operator { pub async fn presign_stat(&self, path: String, expires: u32) -> Result { let res = self .0 - .presign_stat(&path, Duration::seconds(expires as i64)) + .presign_stat(&path, Duration::from_secs(expires as u64)) .await .map_err(format_napi_error)?; Ok(PresignedRequest::new(res)) diff --git a/core/Cargo.toml b/core/Cargo.toml index ac7e704b9bea..a13a8d49c1de 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -121,7 +121,7 @@ redis = { version = "0.22", features = [ ], optional = true } # NOTE: we keep this for service migration one by one. And finally we will replace reqsign by v0.9. reqsign = "0.8.5" -reqsign_0_9 = { package = "reqsign", git = "https://github.com/Xuanwo/reqsign", rev = "877292a171bfec9593df27cb4ec94676e77a9d57" } +reqsign_0_9 = { package = "reqsign", git = "https://github.com/Xuanwo/reqsign", rev = "fde88af3aecf4ba6c39e5d84dc39c5200f8f3a5e" } reqwest = { version = "0.11.13", features = [ "multipart", "stream", diff --git a/core/src/raw/http_util/header.rs b/core/src/raw/http_util/header.rs index 3c1fc7caae61..763d74b3dc06 100644 --- a/core/src/raw/http_util/header.rs +++ b/core/src/raw/http_util/header.rs @@ -26,6 +26,7 @@ use http::header::ETAG; use http::header::LAST_MODIFIED; use http::header::LOCATION; use http::HeaderMap; +use http::HeaderValue; use md5::Digest; use time::format_description::well_known::Rfc2822; use time::OffsetDateTime; @@ -273,6 +274,18 @@ pub fn format_authorization_by_bearer(token: &str) -> Result { Ok(format!("Bearer {token}")) } +/// Build header value from given string. +pub fn build_header_value(v: &str) -> Result { + HeaderValue::from_str(v).map_err(|e| { + Error::new( + ErrorKind::ConfigInvalid, + "header value contains invalid characters", + ) + .with_operation("http_util::build_header_value") + .set_source(e) + }) +} + #[cfg(test)] mod tests { use super::*; diff --git a/core/src/raw/http_util/mod.rs b/core/src/raw/http_util/mod.rs index ed5acaca82f1..ba5cbbcbea7c 100644 --- a/core/src/raw/http_util/mod.rs +++ b/core/src/raw/http_util/mod.rs @@ -30,6 +30,7 @@ pub use body::AsyncBody; pub use body::IncomingAsyncBody; mod header; +pub use header::build_header_value; pub use header::format_authorization_by_basic; pub use header::format_authorization_by_bearer; pub use header::format_content_md5; diff --git a/core/src/services/gcs/core.rs b/core/src/services/gcs/core.rs index 845269834946..28a8c512504b 100644 --- a/core/src/services/gcs/core.rs +++ b/core/src/services/gcs/core.rs @@ -19,10 +19,13 @@ use std::fmt::Debug; use std::fmt::Formatter; use std::fmt::Write; +use backon::ExponentialBuilder; +use backon::Retryable; use http::header::CONTENT_LENGTH; use http::header::CONTENT_TYPE; use http::Request; use http::Response; +use once_cell::sync::Lazy; use reqsign_0_9::GoogleCredentialLoader; use reqsign_0_9::GoogleSigner; use reqsign_0_9::GoogleToken; @@ -53,11 +56,13 @@ impl Debug for GcsCore { } } +static BACKOFF: Lazy = + Lazy::new(|| ExponentialBuilder::default().with_jitter()); + impl GcsCore { async fn load_token(&self) -> Result { - let cred = self - .token_loader - .load() + let cred = { || self.token_loader.load() } + .retry(&*BACKOFF) .await .map_err(new_request_credential_error)?; diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index 713b7572c346..4f3e88bbd8fb 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -552,7 +552,11 @@ impl Accessor for OssBackend { }; self.signer - .sign_query(&mut req, args.expire()) + .sign_query( + &mut req, + // TODO: convert to std::time::Duration + time::Duration::seconds_f64(args.expire().as_secs_f64()), + ) .map_err(new_request_sign_error)?; // We don't need this request anymore, consume it directly. diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index 52ae138e64bd..58b13a39150e 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -25,27 +25,17 @@ use async_trait::async_trait; use base64::prelude::BASE64_STANDARD; use base64::Engine; use bytes::Buf; -use bytes::Bytes; -use http::header::HeaderName; -use http::header::CACHE_CONTROL; -use http::header::CONTENT_DISPOSITION; -use http::header::CONTENT_LENGTH; -use http::header::CONTENT_TYPE; -use http::HeaderValue; -use http::Request; -use http::Response; use http::StatusCode; use log::debug; use md5::Digest; use md5::Md5; use once_cell::sync::Lazy; -use reqsign::AwsConfigLoader; -use reqsign::AwsCredentialLoad; -use reqsign::AwsCredentialLoader; -use reqsign::AwsV4Signer; -use serde::Deserialize; -use serde::Serialize; +use reqsign_0_9::AwsConfig; +use reqsign_0_9::AwsCredentialLoad; +use reqsign_0_9::AwsLoader; +use reqsign_0_9::AwsV4Signer; +use super::core::*; use super::error::parse_error; use super::pager::S3Pager; use super::writer::S3Writer; @@ -64,31 +54,6 @@ static ENDPOINT_TEMPLATES: Lazy> = Lazy::new m }); -mod constants { - pub const X_AMZ_COPY_SOURCE: &str = "x-amz-copy-source"; - - pub const X_AMZ_SERVER_SIDE_ENCRYPTION: &str = "x-amz-server-side-encryption"; - pub const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: &str = - "x-amz-server-side-encryption-customer-algorithm"; - pub const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: &str = - "x-amz-server-side-encryption-customer-key"; - pub const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: &str = - "x-amz-server-side-encryption-customer-key-md5"; - pub const X_AMZ_SERVER_SIDE_ENCRYPTION_AWS_KMS_KEY_ID: &str = - "x-amz-server-side-encryption-aws-kms-key-id"; - pub const X_AMZ_STORAGE_CLASS: &str = "x-amz-storage-class"; - - pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: &str = - "x-amz-copy-source-server-side-encryption-customer-algorithm"; - pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: &str = - "x-amz-copy-source-server-side-encryption-customer-key"; - pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: &str = - "x-amz-copy-source-server-side-encryption-customer-key-md5"; - - pub const RESPONSE_CONTENT_DISPOSITION: &str = "response-content-disposition"; - pub const RESPONSE_CACHE_CONTROL: &str = "response-cache-control"; -} - /// Aws S3 and compatible services (including minio, digitalocean space and so on) support /// /// # Capabilities @@ -311,7 +276,7 @@ mod constants { /// /// # Compatible Services #[doc = include_str!("compatible_services.md")] -#[derive(Default, Clone)] +#[derive(Default)] pub struct S3Builder { root: Option, @@ -333,10 +298,11 @@ pub struct S3Builder { security_token: Option, disable_config_load: bool, + disable_ec2_metadata: bool, enable_virtual_host_style: bool, http_client: Option, - customed_credential_load: Option>, + customed_credential_load: Option>, } impl Debug for S3Builder { @@ -346,39 +312,9 @@ impl Debug for S3Builder { d.field("root", &self.root) .field("bucket", &self.bucket) .field("endpoint", &self.endpoint) - .field("region", &self.region) - .field("role_arn", &self.role_arn) - .field("external_id", &self.external_id) - .field("disable_config_load", &self.disable_config_load) - .field("enable_virtual_host_style", &self.enable_virtual_host_style) - .field("default_storage_class", &self.default_storage_class); - - if self.access_key_id.is_some() { - d.field("access_key_id", &""); - } - if self.secret_access_key.is_some() { - d.field("secret_access_key", &""); - } - if self.server_side_encryption.is_some() { - d.field("server_side_encryption", &""); - } - if self.server_side_encryption_aws_kms_key_id.is_some() { - d.field("server_side_encryption_aws_kms_key_id", &""); - } - if self.server_side_encryption_customer_algorithm.is_some() { - d.field("server_side_encryption_customer_algorithm", &""); - } - if self.server_side_encryption_customer_key.is_some() { - d.field("server_side_encryption_customer_key", &""); - } - if self.server_side_encryption_customer_key_md5.is_some() { - d.field("server_side_encryption_customer_key_md5", &""); - } - if self.security_token.is_some() { - d.field("security_token", &""); - } + .field("region", &self.region); - d.finish() + d.finish_non_exhaustive() } } @@ -674,6 +610,15 @@ impl S3Builder { self } + /// Disable load credential from ec2 metadata. + /// + /// This option is used to disable the default behavior of opendal + /// to load credential from ec2 metadata, a.k.a, IMDSv2 + pub fn disable_ec2_metadata(&mut self) -> &mut Self { + self.disable_ec2_metadata = true; + self + } + /// Enable virtual host style so that opendal will send API requests /// in virtual host style instead of path style. /// @@ -685,8 +630,8 @@ impl S3Builder { } /// Adding a customed credential load for service. - pub fn customed_credential_load(&mut self, cred: impl AwsCredentialLoad) -> &mut Self { - self.customed_credential_load = Some(Arc::new(cred)); + pub fn customed_credential_load(&mut self, cred: Box) -> &mut Self { + self.customed_credential_load = Some(cred); self } @@ -790,6 +735,9 @@ impl Builder for S3Builder { map.get("disable_config_load") .filter(|v| *v == "on" || *v == "true") .map(|_| builder.disable_config_load()); + map.get("disable_ec2_metadata") + .filter(|v| *v == "on" || *v == "true") + .map(|_| builder.disable_ec2_metadata()); map.get("enable_virtual_host_style") .filter(|v| *v == "on" || *v == "true") .map(|_| builder.enable_virtual_host_style()); @@ -818,75 +766,48 @@ impl Builder for S3Builder { let default_storage_class = match &self.default_storage_class { None => None, - Some(v) => Some(v.parse().map_err(|e| { - Error::new( - ErrorKind::ConfigInvalid, - "default_storage_class value is invalid", - ) - .with_context("value", v) - .set_source(e) - })?), + Some(v) => Some( + build_header_value(v).map_err(|err| err.with_context("key", "storage_class"))?, + ), }; let server_side_encryption = match &self.server_side_encryption { None => None, - Some(v) => Some(v.parse().map_err(|e| { - Error::new( - ErrorKind::ConfigInvalid, - "server_side_encryption value is invalid", - ) - .with_context("value", v) - .set_source(e) - })?), + Some(v) => Some( + build_header_value(v) + .map_err(|err| err.with_context("key", "server_side_encryption"))?, + ), }; let server_side_encryption_aws_kms_key_id = match &self.server_side_encryption_aws_kms_key_id { None => None, - Some(v) => Some(v.parse().map_err(|e| { - Error::new( - ErrorKind::ConfigInvalid, - "server_side_encryption_aws_kms_key_id value is invalid", - ) - .with_context("value", v) - .set_source(e) + Some(v) => Some(build_header_value(v).map_err(|err| { + err.with_context("key", "server_side_encryption_aws_kms_key_id") })?), }; let server_side_encryption_customer_algorithm = match &self.server_side_encryption_customer_algorithm { None => None, - Some(v) => Some(v.parse().map_err(|e| { - Error::new( - ErrorKind::ConfigInvalid, - "server_side_encryption_customer_algorithm value is invalid", - ) - .with_context("value", v) - .set_source(e) + Some(v) => Some(build_header_value(v).map_err(|err| { + err.with_context("key", "server_side_encryption_customer_algorithm") + })?), + }; + + let server_side_encryption_customer_key = + match &self.server_side_encryption_customer_key { + None => None, + Some(v) => Some(build_header_value(v).map_err(|err| { + err.with_context("key", "server_side_encryption_customer_key") })?), }; - let server_side_encryption_customer_key = match &self.server_side_encryption_customer_key { - None => None, - Some(v) => Some(v.parse().map_err(|e| { - Error::new( - ErrorKind::ConfigInvalid, - "server_side_encryption_customer_key value is invalid", - ) - .with_context("value", v) - .set_source(e) - })?), - }; let server_side_encryption_customer_key_md5 = match &self.server_side_encryption_customer_key_md5 { None => None, - Some(v) => Some(v.parse().map_err(|e| { - Error::new( - ErrorKind::ConfigInvalid, - "server_side_encryption_customer_key_md5 value is invalid", - ) - .with_context("value", v) - .set_source(e) + Some(v) => Some(build_header_value(v).map_err(|err| { + err.with_context("key", "server_side_encryption_customer_key_md5") })?), }; @@ -899,89 +820,73 @@ impl Builder for S3Builder { })? }; - let cfg = AwsConfigLoader::default(); + let mut cfg = AwsConfig::default(); if !self.disable_config_load { - cfg.load(); + cfg = cfg.from_profile(); + cfg = cfg.from_env(); } // Setting all value from user input if available. - if let Some(region) = &self.region { - cfg.set_region(region); + if let Some(v) = self.region.take() { + cfg.region = Some(v); } - if let Some(v) = &self.access_key_id { - cfg.set_access_key_id(v); + if let Some(v) = self.access_key_id.take() { + cfg.access_key_id = Some(v) } - if let Some(v) = &self.secret_access_key { - cfg.set_secret_access_key(v); + if let Some(v) = self.secret_access_key.take() { + cfg.secret_access_key = Some(v) } - if let Some(v) = &self.security_token { - cfg.set_session_token(v); + if let Some(v) = self.security_token.take() { + cfg.session_token = Some(v) } - if let Some(v) = &self.role_arn { - cfg.set_role_arn(v); + if let Some(v) = self.role_arn.take() { + cfg.role_arn = Some(v) } - if let Some(v) = &self.external_id { - cfg.set_external_id(v); + if let Some(v) = self.external_id.take() { + cfg.external_id = Some(v) } - // Calculate region based on current cfg. - let region = match cfg.region() { - Some(region) => region, - None => { - // region is required to make signer work. - // - // If we don't know region after loading from builder and env. - // We will use `us-east-1` as default. - let region = "us-east-1".to_string(); - cfg.set_region(®ion); - - region - } - }; + if cfg.region.is_none() { + // region is required to make signer work. + // + // If we don't know region after loading from builder and env. + // We will use `us-east-1` as default. + cfg.region = Some("us-east-1".to_string()); + } + + let region = cfg.region.to_owned().unwrap(); debug!("backend use region: {region}"); // Building endpoint. let endpoint = self.build_endpoint(®ion); debug!("backend use endpoint: {endpoint}"); - let mut signer_builder = AwsV4Signer::builder(); - signer_builder.service("s3"); - signer_builder.allow_anonymous(); - signer_builder.config_loader(cfg.clone()); - signer_builder.credential_loader({ - let mut cred_loader = AwsCredentialLoader::new(cfg); - cred_loader = cred_loader.with_allow_anonymous(); - if self.disable_config_load { - // If load config has been disable, we should also disable - // ec2 metadata to avoid leaking permits. - cred_loader = cred_loader.with_disable_ec2_metadata(); - } - - if let Some(ccl) = &self.customed_credential_load { - cred_loader = cred_loader.with_customed_credential_loader(ccl.clone()); - } - - cred_loader - }); + let mut loader = AwsLoader::new(client.client(), cfg).with_allow_anonymous(); + if self.disable_ec2_metadata { + loader = loader.with_disable_ec2_metadata(); + } + if let Some(v) = self.customed_credential_load.take() { + loader = loader.with_customed_credential_loader(v); + } - let signer = signer_builder - .build() - .map_err(|e| Error::new(ErrorKind::Unexpected, "build AwsV4Signer").set_source(e))?; + let signer = AwsV4Signer::new("s3", ®ion); - debug!("backend build finished: {:?}", &self); + debug!("backend build finished"); Ok(S3Backend { - root, - endpoint, - signer: Arc::new(signer), - bucket: self.bucket.clone(), - client, - - server_side_encryption, - server_side_encryption_aws_kms_key_id, - server_side_encryption_customer_algorithm, - server_side_encryption_customer_key, - server_side_encryption_customer_key_md5, - default_storage_class, + core: Arc::new(S3Core { + bucket: bucket.to_string(), + endpoint, + root, + server_side_encryption, + server_side_encryption_aws_kms_key_id, + server_side_encryption_customer_algorithm, + server_side_encryption_customer_key, + server_side_encryption_customer_key_md5, + default_storage_class, + signer, + loader, + client, + }), }) } } @@ -989,82 +894,7 @@ impl Builder for S3Builder { /// Backend for s3 services. #[derive(Debug, Clone)] pub struct S3Backend { - bucket: String, - endpoint: String, - pub signer: Arc, - pub client: HttpClient, - // root will be "/" or "/abc/" - root: String, - - server_side_encryption: Option, - server_side_encryption_aws_kms_key_id: Option, - server_side_encryption_customer_algorithm: Option, - server_side_encryption_customer_key: Option, - server_side_encryption_customer_key_md5: Option, - default_storage_class: Option, -} - -impl S3Backend { - /// # Note - /// - /// header like X_AMZ_SERVER_SIDE_ENCRYPTION doesn't need to set while - // get or stat. - pub(crate) fn insert_sse_headers( - &self, - mut req: http::request::Builder, - is_write: bool, - ) -> http::request::Builder { - if is_write { - if let Some(v) = &self.server_side_encryption { - let mut v = v.clone(); - v.set_sensitive(true); - - req = req.header( - HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION), - v, - ) - } - if let Some(v) = &self.server_side_encryption_aws_kms_key_id { - let mut v = v.clone(); - v.set_sensitive(true); - - req = req.header( - HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION_AWS_KMS_KEY_ID), - v, - ) - } - } - - if let Some(v) = &self.server_side_encryption_customer_algorithm { - let mut v = v.clone(); - v.set_sensitive(true); - - req = req.header( - HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM), - v, - ) - } - if let Some(v) = &self.server_side_encryption_customer_key { - let mut v = v.clone(); - v.set_sensitive(true); - - req = req.header( - HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY), - v, - ) - } - if let Some(v) = &self.server_side_encryption_customer_key_md5 { - let mut v = v.clone(); - v.set_sensitive(true); - - req = req.header( - HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5), - v, - ) - } - - req - } + core: Arc, } #[async_trait] @@ -1082,8 +912,8 @@ impl Accessor for S3Backend { let mut am = AccessorInfo::default(); am.set_scheme(Scheme::S3) - .set_root(&self.root) - .set_name(&self.bucket) + .set_root(&self.core.root) + .set_name(&self.core.bucket) .set_max_batch_operations(1000) .set_capabilities(Read | Write | List | Scan | Presign | Batch | Copy) .set_hints(ReadStreamable); @@ -1093,11 +923,12 @@ impl Accessor for S3Backend { async fn create(&self, path: &str, _: OpCreate) -> Result { let mut req = - self.s3_put_object_request(path, Some(0), None, None, None, AsyncBody::Empty)?; + self.core + .s3_put_object_request(path, Some(0), None, None, None, AsyncBody::Empty)?; - self.signer.sign(&mut req).map_err(new_request_sign_error)?; + self.core.sign(&mut req).await?; - let resp = self.client.send(req).await?; + let resp = self.core.send(req).await?; let status = resp.status(); @@ -1112,6 +943,7 @@ impl Accessor for S3Backend { async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { let resp = self + .core .s3_get_object(path, args.range(), args.if_none_match()) .await?; @@ -1129,6 +961,7 @@ impl Accessor for S3Backend { async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { let upload_id = if args.append() { let resp = self + .core .s3_initiate_multipart_upload( path, args.content_type(), @@ -1157,12 +990,12 @@ impl Accessor for S3Backend { Ok(( RpWrite::default(), - S3Writer::new(self.clone(), args, path.to_string(), upload_id), + S3Writer::new(self.core.clone(), args, path.to_string(), upload_id), )) } async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result { - let resp = self.s3_copy_object(from, to).await?; + let resp = self.core.s3_copy_object(from, to).await?; let status = resp.status(); @@ -1184,7 +1017,7 @@ impl Accessor for S3Backend { return Ok(RpStat::new(Metadata::new(EntryMode::DIR))); } - let resp = self.s3_head_object(path, args.if_none_match()).await?; + let resp = self.core.s3_head_object(path, args.if_none_match()).await?; let status = resp.status(); @@ -1198,7 +1031,7 @@ impl Accessor for S3Backend { } async fn delete(&self, path: &str, _: OpDelete) -> Result { - let resp = self.s3_delete_object(path).await?; + let resp = self.core.s3_delete_object(path).await?; let status = resp.status(); @@ -1211,22 +1044,24 @@ impl Accessor for S3Backend { async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { Ok(( RpList::default(), - S3Pager::new(Arc::new(self.clone()), &self.root, path, "/", args.limit()), + S3Pager::new(self.core.clone(), path, "/", args.limit()), )) } async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { Ok(( RpScan::default(), - S3Pager::new(Arc::new(self.clone()), &self.root, path, "", args.limit()), + S3Pager::new(self.core.clone(), path, "", args.limit()), )) } async fn presign(&self, path: &str, args: OpPresign) -> Result { // We will not send this request out, just for signing. let mut req = match args.operation() { - PresignOperation::Stat(v) => self.s3_head_object_request(path, v.if_none_match())?, - PresignOperation::Read(v) => self.s3_get_object_request( + PresignOperation::Stat(v) => { + self.core.s3_head_object_request(path, v.if_none_match())? + } + PresignOperation::Read(v) => self.core.s3_get_object_request( path, v.range(), v.override_content_disposition(), @@ -1234,13 +1069,12 @@ impl Accessor for S3Backend { v.if_none_match(), )?, PresignOperation::Write(_) => { - self.s3_put_object_request(path, None, None, None, None, AsyncBody::Empty)? + self.core + .s3_put_object_request(path, None, None, None, None, AsyncBody::Empty)? } }; - self.signer - .sign_query(&mut req, args.expire()) - .map_err(new_request_sign_error)?; + self.core.sign_query(&mut req, args.expire()).await?; // We don't need this request anymore, consume it directly. let (parts, _) = req.into_parts(); @@ -1264,7 +1098,7 @@ impl Accessor for S3Backend { let paths = ops.into_iter().map(|(p, _)| p).collect(); - let resp = self.s3_delete_objects(paths).await?; + let resp = self.core.s3_delete_objects(paths).await?; let status = resp.status(); @@ -1276,12 +1110,12 @@ impl Accessor for S3Backend { let mut batched_result = Vec::with_capacity(result.deleted.len() + result.error.len()); for i in result.deleted { - let path = build_rel_path(&self.root, &i.key); + let path = build_rel_path(&self.core.root, &i.key); batched_result.push((path, Ok(RpDelete::default().into()))); } // TODO: we should handle those errors with code. for i in result.error { - let path = build_rel_path(&self.root, &i.key); + let path = build_rel_path(&self.core.root, &i.key); batched_result.push(( path, @@ -1296,519 +1130,8 @@ impl Accessor for S3Backend { } } -impl S3Backend { - fn s3_head_object_request( - &self, - path: &str, - if_none_match: Option<&str>, - ) -> Result> { - let p = build_abs_path(&self.root, path); - - let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); - - let mut req = Request::head(&url); - - req = self.insert_sse_headers(req, false); - - if let Some(if_none_match) = if_none_match { - req = req.header(http::header::IF_NONE_MATCH, if_none_match); - } - - let req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - Ok(req) - } - - fn s3_get_object_request( - &self, - path: &str, - range: BytesRange, - override_content_disposition: Option<&str>, - override_cache_control: Option<&str>, - if_none_match: Option<&str>, - ) -> Result> { - let p = build_abs_path(&self.root, path); - - // Construct headers to add to the request - let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); - - // Add query arguments to the URL based on response overrides - let mut query_args = Vec::new(); - if let Some(override_content_disposition) = override_content_disposition { - query_args.push(format!( - "{}={}", - constants::RESPONSE_CONTENT_DISPOSITION, - percent_encode_path(override_content_disposition) - )) - } - if let Some(override_cache_control) = override_cache_control { - query_args.push(format!( - "{}={}", - constants::RESPONSE_CACHE_CONTROL, - percent_encode_path(override_cache_control) - )) - } - if !query_args.is_empty() { - url.push_str(&format!("?{}", query_args.join("&"))); - } - - let mut req = Request::get(&url); - - if !range.is_full() { - req = req.header(http::header::RANGE, range.to_header()); - } - - if let Some(if_none_match) = if_none_match { - req = req.header(http::header::IF_NONE_MATCH, if_none_match); - } - - // Set SSE headers. - // TODO: how will this work with presign? - req = self.insert_sse_headers(req, false); - - let req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - Ok(req) - } - - async fn s3_get_object( - &self, - path: &str, - range: BytesRange, - if_none_match: Option<&str>, - ) -> Result> { - let mut req = self.s3_get_object_request(path, range, None, None, if_none_match)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - - self.client.send(req).await - } - - pub fn s3_put_object_request( - &self, - path: &str, - size: Option, - content_type: Option<&str>, - content_disposition: Option<&str>, - cache_control: Option<&str>, - body: AsyncBody, - ) -> Result> { - let p = build_abs_path(&self.root, path); - - let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); - - let mut req = Request::put(&url); - - if let Some(size) = size { - req = req.header(CONTENT_LENGTH, size) - } - - if let Some(mime) = content_type { - req = req.header(CONTENT_TYPE, mime) - } - - if let Some(pos) = content_disposition { - req = req.header(CONTENT_DISPOSITION, pos) - } - - if let Some(cache_control) = cache_control { - req = req.header(CACHE_CONTROL, cache_control) - } - - // Set storage class header - if let Some(v) = &self.default_storage_class { - req = req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v); - } - - // Set SSE headers. - req = self.insert_sse_headers(req, true); - - // Set body - let req = req.body(body).map_err(new_request_build_error)?; - - Ok(req) - } - - async fn s3_head_object( - &self, - path: &str, - if_none_match: Option<&str>, - ) -> Result> { - let p = build_abs_path(&self.root, path); - - let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); - - let mut req = Request::head(&url); - - // Set SSE headers. - req = self.insert_sse_headers(req, false); - - if let Some(if_none_match) = if_none_match { - req = req.header(http::header::IF_NONE_MATCH, if_none_match); - } - - let mut req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - - self.client.send(req).await - } - - async fn s3_delete_object(&self, path: &str) -> Result> { - let p = build_abs_path(&self.root, path); - - let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); - - let mut req = Request::delete(&url) - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - - self.client.send(req).await - } - - async fn s3_copy_object(&self, from: &str, to: &str) -> Result> { - let from = build_abs_path(&self.root, from); - let to = build_abs_path(&self.root, to); - - let source = format!("{}/{}", self.bucket, percent_encode_path(&from)); - let target = format!("{}/{}", self.endpoint, percent_encode_path(&to)); - - let mut req = Request::put(&target); - - // Set SSE headers. - req = self.insert_sse_headers(req, true); - - if let Some(v) = &self.server_side_encryption_customer_algorithm { - let mut v = v.clone(); - v.set_sensitive(true); - - req = req.header( - HeaderName::from_static( - constants::X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM, - ), - v, - ) - } - - if let Some(v) = &self.server_side_encryption_customer_key { - let mut v = v.clone(); - v.set_sensitive(true); - - req = req.header( - HeaderName::from_static( - constants::X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY, - ), - v, - ) - } - - if let Some(v) = &self.server_side_encryption_customer_key_md5 { - let mut v = v.clone(); - v.set_sensitive(true); - - req = req.header( - HeaderName::from_static( - constants::X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5, - ), - v, - ) - } - - let mut req = req - .header(constants::X_AMZ_COPY_SOURCE, percent_encode_path(&source)) - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - - self.client.send(req).await - } - - /// Make this functions as `pub(suber)` because `DirStream` depends - /// on this. - pub(super) async fn s3_list_objects( - &self, - path: &str, - continuation_token: &str, - delimiter: &str, - limit: Option, - ) -> Result> { - let p = build_abs_path(&self.root, path); - - let mut url = format!( - "{}?list-type=2&delimiter={delimiter}&prefix={}", - self.endpoint, - percent_encode_path(&p) - ); - if let Some(limit) = limit { - write!(url, "&max-keys={limit}").expect("write into string must succeed"); - } - if !continuation_token.is_empty() { - // AWS S3 could return continuation-token that contains `=` - // which could lead `reqsign` parse query wrongly. - // URL encode continuation-token before starting signing so that - // our signer will not be confused. - write!( - url, - "&continuation-token={}", - percent_encode_path(continuation_token) - ) - .expect("write into string must succeed"); - } - - let mut req = Request::get(&url) - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - - self.client.send(req).await - } - - async fn s3_initiate_multipart_upload( - &self, - path: &str, - content_type: Option<&str>, - content_disposition: Option<&str>, - cache_control: Option<&str>, - ) -> Result> { - let p = build_abs_path(&self.root, path); - - let url = format!("{}/{}?uploads", self.endpoint, percent_encode_path(&p)); - - let mut req = Request::post(&url); - - if let Some(mime) = content_type { - req = req.header(CONTENT_TYPE, mime) - } - - if let Some(content_disposition) = content_disposition { - req = req.header(CONTENT_DISPOSITION, content_disposition) - } - - if let Some(cache_control) = cache_control { - req = req.header(CACHE_CONTROL, cache_control) - } - - // Set storage class header - if let Some(v) = &self.default_storage_class { - req = req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v); - } - - // Set SSE headers. - let req = self.insert_sse_headers(req, true); - - let mut req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - - self.client.send(req).await - } - - pub fn s3_upload_part_request( - &self, - path: &str, - upload_id: &str, - part_number: usize, - size: Option, - body: AsyncBody, - ) -> Result> { - let p = build_abs_path(&self.root, path); - - let url = format!( - "{}/{}?partNumber={}&uploadId={}", - self.endpoint, - percent_encode_path(&p), - part_number, - percent_encode_path(upload_id) - ); - - let mut req = Request::put(&url); - - if let Some(size) = size { - req = req.header(CONTENT_LENGTH, size); - } - - // Set SSE headers. - req = self.insert_sse_headers(req, true); - - // Set body - let req = req.body(body).map_err(new_request_build_error)?; - - Ok(req) - } - - pub async fn s3_complete_multipart_upload( - &self, - path: &str, - upload_id: &str, - parts: &[CompleteMultipartUploadRequestPart], - ) -> Result> { - let p = build_abs_path(&self.root, path); - - let url = format!( - "{}/{}?uploadId={}", - self.endpoint, - percent_encode_path(&p), - percent_encode_path(upload_id) - ); - - let req = Request::post(&url); - - // Set SSE headers. - let req = self.insert_sse_headers(req, true); - - let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest { - part: parts.to_vec(), - }) - .map_err(new_xml_deserialize_error)?; - // Make sure content length has been set to avoid post with chunked encoding. - let req = req.header(CONTENT_LENGTH, content.len()); - // Set content-type to `application/xml` to avoid mixed with form post. - let req = req.header(CONTENT_TYPE, "application/xml"); - - let mut req = req - .body(AsyncBody::Bytes(Bytes::from(content))) - .map_err(new_request_build_error)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - - self.client.send(req).await - } - - async fn s3_delete_objects(&self, paths: Vec) -> Result> { - let url = format!("{}/?delete", self.endpoint); - - let req = Request::post(&url); - - let content = quick_xml::se::to_string(&DeleteObjectsRequest { - object: paths - .into_iter() - .map(|path| DeleteObjectsRequestObject { - key: build_abs_path(&self.root, &path), - }) - .collect(), - }) - .map_err(new_xml_deserialize_error)?; - - // Make sure content length has been set to avoid post with chunked encoding. - let req = req.header(CONTENT_LENGTH, content.len()); - // Set content-type to `application/xml` to avoid mixed with form post. - let req = req.header(CONTENT_TYPE, "application/xml"); - // Set content-md5 as required by API. - let req = req.header("CONTENT-MD5", format_content_md5(content.as_bytes())); - - let mut req = req - .body(AsyncBody::Bytes(Bytes::from(content))) - .map_err(new_request_build_error)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - - self.client.send(req).await - } -} - -/// Result of CreateMultipartUpload -#[derive(Default, Debug, Deserialize)] -#[serde(default, rename_all = "PascalCase")] -struct InitiateMultipartUploadResult { - upload_id: String, -} - -/// Request of CompleteMultipartUploadRequest -#[derive(Default, Debug, Serialize)] -#[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")] -struct CompleteMultipartUploadRequest { - part: Vec, -} - -#[derive(Clone, Default, Debug, Serialize)] -#[serde(default, rename_all = "PascalCase")] -pub struct CompleteMultipartUploadRequestPart { - #[serde(rename = "PartNumber")] - pub part_number: usize, - /// # TODO - /// - /// quick-xml will do escape on `"` which leads to our serialized output is - /// not the same as aws s3's example. - /// - /// Ideally, we could use `serialize_with` to address this (buf failed) - /// - /// ```ignore - /// #[derive(Default, Debug, Serialize)] - /// #[serde(default, rename_all = "PascalCase")] - /// struct CompleteMultipartUploadRequestPart { - /// #[serde(rename = "PartNumber")] - /// part_number: usize, - /// #[serde(rename = "ETag", serialize_with = "partial_escape")] - /// etag: String, - /// } - /// - /// fn partial_escape(s: &str, ser: S) -> std::result::Result - /// where - /// S: serde::Serializer, - /// { - /// ser.serialize_str(&String::from_utf8_lossy( - /// &quick_xml::escape::partial_escape(s.as_bytes()), - /// )) - /// } - /// ``` - /// - /// ref: - #[serde(rename = "ETag")] - pub etag: String, -} - -/// Request of DeleteObjects. -#[derive(Default, Debug, Serialize)] -#[serde(default, rename = "Delete", rename_all = "PascalCase")] -struct DeleteObjectsRequest { - object: Vec, -} - -#[derive(Default, Debug, Serialize)] -#[serde(rename_all = "PascalCase")] -struct DeleteObjectsRequestObject { - key: String, -} - -/// Result of DeleteObjects. -#[derive(Default, Debug, Deserialize)] -#[serde(default, rename = "DeleteResult", rename_all = "PascalCase")] -struct DeleteObjectsResult { - deleted: Vec, - error: Vec, -} - -#[derive(Default, Debug, Deserialize)] -#[serde(rename_all = "PascalCase")] -struct DeleteObjectsResultDeleted { - key: String, -} - -#[derive(Default, Debug, Deserialize)] -#[serde(default, rename_all = "PascalCase")] -struct DeleteObjectsResultError { - code: String, - key: String, - message: String, -} - #[cfg(test)] mod tests { - use bytes::Buf; - use bytes::Bytes; - use super::*; #[test] @@ -1866,129 +1189,4 @@ mod tests { assert_eq!(endpoint, "https://test.s3.us-east-2.amazonaws.com"); } } - - /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateMultipartUpload.html#API_CreateMultipartUpload_Examples - #[test] - fn test_deserialize_initiate_multipart_upload_result() { - let bs = Bytes::from( - r#" - - example-bucket - example-object - VXBsb2FkIElEIGZvciA2aWWpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZA - "#, - ); - - let out: InitiateMultipartUploadResult = - quick_xml::de::from_reader(bs.reader()).expect("must success"); - - assert_eq!( - out.upload_id, - "VXBsb2FkIElEIGZvciA2aWWpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZA" - ) - } - - /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html#API_CompleteMultipartUpload_Examples - #[test] - fn test_serialize_complete_multipart_upload_request() { - let req = CompleteMultipartUploadRequest { - part: vec![ - CompleteMultipartUploadRequestPart { - part_number: 1, - etag: "\"a54357aff0632cce46d942af68356b38\"".to_string(), - }, - CompleteMultipartUploadRequestPart { - part_number: 2, - etag: "\"0c78aef83f66abc1fa1e8477f296d394\"".to_string(), - }, - CompleteMultipartUploadRequestPart { - part_number: 3, - etag: "\"acbd18db4cc2f85cedef654fccc4a4d8\"".to_string(), - }, - ], - }; - - let actual = quick_xml::se::to_string(&req).expect("must succeed"); - - pretty_assertions::assert_eq!( - actual, - r#" - - 1 - "a54357aff0632cce46d942af68356b38" - - - 2 - "0c78aef83f66abc1fa1e8477f296d394" - - - 3 - "acbd18db4cc2f85cedef654fccc4a4d8" - - "# - // Cleanup space and new line - .replace([' ', '\n'], "") - // Escape `"` by hand to address - .replace('"', """) - ) - } - - /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_Examples - #[test] - fn test_serialize_delete_objects_request() { - let req = DeleteObjectsRequest { - object: vec![ - DeleteObjectsRequestObject { - key: "sample1.txt".to_string(), - }, - DeleteObjectsRequestObject { - key: "sample2.txt".to_string(), - }, - ], - }; - - let actual = quick_xml::se::to_string(&req).expect("must succeed"); - - pretty_assertions::assert_eq!( - actual, - r#" - - sample1.txt - - - sample2.txt - - "# - // Cleanup space and new line - .replace([' ', '\n'], "") - ) - } - - /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_Examples - #[test] - fn test_deserialize_delete_objects_result() { - let bs = Bytes::from( - r#" - - - sample1.txt - - - sample2.txt - AccessDenied - Access Denied - - "#, - ); - - let out: DeleteObjectsResult = - quick_xml::de::from_reader(bs.reader()).expect("must success"); - - assert_eq!(out.deleted.len(), 1); - assert_eq!(out.deleted[0].key, "sample1.txt"); - assert_eq!(out.error.len(), 1); - assert_eq!(out.error[0].key, "sample2.txt"); - assert_eq!(out.error[0].code, "AccessDenied"); - assert_eq!(out.error[0].message, "Access Denied"); - } } diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs new file mode 100644 index 000000000000..58b19a4a4bb1 --- /dev/null +++ b/core/src/services/s3/core.rs @@ -0,0 +1,849 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::fmt::Write; +use std::time::Duration; + +use backon::ExponentialBuilder; +use backon::Retryable; +use bytes::Bytes; +use http::header::HeaderName; +use http::header::CACHE_CONTROL; +use http::header::CONTENT_DISPOSITION; +use http::header::CONTENT_LENGTH; +use http::header::CONTENT_TYPE; +use http::HeaderValue; +use http::Request; +use http::Response; +use once_cell::sync::Lazy; +use reqsign_0_9::AwsCredential; +use reqsign_0_9::AwsLoader; +use reqsign_0_9::AwsV4Signer; +use serde::Deserialize; +use serde::Serialize; + +use crate::raw::*; +use crate::*; + +mod constants { + pub const X_AMZ_COPY_SOURCE: &str = "x-amz-copy-source"; + + pub const X_AMZ_SERVER_SIDE_ENCRYPTION: &str = "x-amz-server-side-encryption"; + pub const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: &str = + "x-amz-server-side-encryption-customer-algorithm"; + pub const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: &str = + "x-amz-server-side-encryption-customer-key"; + pub const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: &str = + "x-amz-server-side-encryption-customer-key-md5"; + pub const X_AMZ_SERVER_SIDE_ENCRYPTION_AWS_KMS_KEY_ID: &str = + "x-amz-server-side-encryption-aws-kms-key-id"; + pub const X_AMZ_STORAGE_CLASS: &str = "x-amz-storage-class"; + + pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: &str = + "x-amz-copy-source-server-side-encryption-customer-algorithm"; + pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: &str = + "x-amz-copy-source-server-side-encryption-customer-key"; + pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: &str = + "x-amz-copy-source-server-side-encryption-customer-key-md5"; + + pub const RESPONSE_CONTENT_DISPOSITION: &str = "response-content-disposition"; + pub const RESPONSE_CACHE_CONTROL: &str = "response-cache-control"; +} + +static BACKOFF: Lazy = + Lazy::new(|| ExponentialBuilder::default().with_jitter()); + +pub struct S3Core { + pub bucket: String, + pub endpoint: String, + pub root: String, + pub server_side_encryption: Option, + pub server_side_encryption_aws_kms_key_id: Option, + pub server_side_encryption_customer_algorithm: Option, + pub server_side_encryption_customer_key: Option, + pub server_side_encryption_customer_key_md5: Option, + pub default_storage_class: Option, + + pub signer: AwsV4Signer, + pub loader: AwsLoader, + pub client: HttpClient, +} + +impl Debug for S3Core { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("S3Core") + .field("bucket", &self.bucket) + .field("endpoint", &self.endpoint) + .field("root", &self.root) + .finish_non_exhaustive() + } +} + +impl S3Core { + /// If credential is not found, we will not sign the request. + async fn load_credential(&self) -> Result> { + let cred = { || self.loader.load() } + .retry(&*BACKOFF) + .await + .map_err(new_request_credential_error)?; + + if let Some(cred) = cred { + Ok(Some(cred)) + } else { + Ok(None) + } + } + + pub async fn sign(&self, req: &mut Request) -> Result<()> { + let cred = if let Some(cred) = self.load_credential().await? { + cred + } else { + return Ok(()); + }; + + self.signer.sign(req, &cred).map_err(new_request_sign_error) + } + + pub async fn sign_query(&self, req: &mut Request, durtion: Duration) -> Result<()> { + let cred = if let Some(cred) = self.load_credential().await? { + cred + } else { + return Ok(()); + }; + + self.signer + .sign_query(req, durtion, &cred) + .map_err(new_request_sign_error) + } + + #[inline] + pub async fn send(&self, req: Request) -> Result> { + self.client.send(req).await + } + + /// # Note + /// + /// header like X_AMZ_SERVER_SIDE_ENCRYPTION doesn't need to set while + // get or stat. + pub fn insert_sse_headers( + &self, + mut req: http::request::Builder, + is_write: bool, + ) -> http::request::Builder { + if is_write { + if let Some(v) = &self.server_side_encryption { + let mut v = v.clone(); + v.set_sensitive(true); + + req = req.header( + HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION), + v, + ) + } + if let Some(v) = &self.server_side_encryption_aws_kms_key_id { + let mut v = v.clone(); + v.set_sensitive(true); + + req = req.header( + HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION_AWS_KMS_KEY_ID), + v, + ) + } + } + + if let Some(v) = &self.server_side_encryption_customer_algorithm { + let mut v = v.clone(); + v.set_sensitive(true); + + req = req.header( + HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM), + v, + ) + } + if let Some(v) = &self.server_side_encryption_customer_key { + let mut v = v.clone(); + v.set_sensitive(true); + + req = req.header( + HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY), + v, + ) + } + if let Some(v) = &self.server_side_encryption_customer_key_md5 { + let mut v = v.clone(); + v.set_sensitive(true); + + req = req.header( + HeaderName::from_static(constants::X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5), + v, + ) + } + + req + } +} + +impl S3Core { + pub fn s3_head_object_request( + &self, + path: &str, + if_none_match: Option<&str>, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); + + let mut req = Request::head(&url); + + req = self.insert_sse_headers(req, false); + + if let Some(if_none_match) = if_none_match { + req = req.header(http::header::IF_NONE_MATCH, if_none_match); + } + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + Ok(req) + } + + pub fn s3_get_object_request( + &self, + path: &str, + range: BytesRange, + override_content_disposition: Option<&str>, + override_cache_control: Option<&str>, + if_none_match: Option<&str>, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + // Construct headers to add to the request + let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); + + // Add query arguments to the URL based on response overrides + let mut query_args = Vec::new(); + if let Some(override_content_disposition) = override_content_disposition { + query_args.push(format!( + "{}={}", + constants::RESPONSE_CONTENT_DISPOSITION, + percent_encode_path(override_content_disposition) + )) + } + if let Some(override_cache_control) = override_cache_control { + query_args.push(format!( + "{}={}", + constants::RESPONSE_CACHE_CONTROL, + percent_encode_path(override_cache_control) + )) + } + if !query_args.is_empty() { + url.push_str(&format!("?{}", query_args.join("&"))); + } + + let mut req = Request::get(&url); + + if !range.is_full() { + req = req.header(http::header::RANGE, range.to_header()); + } + + if let Some(if_none_match) = if_none_match { + req = req.header(http::header::IF_NONE_MATCH, if_none_match); + } + + // Set SSE headers. + // TODO: how will this work with presign? + req = self.insert_sse_headers(req, false); + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + Ok(req) + } + + pub async fn s3_get_object( + &self, + path: &str, + range: BytesRange, + if_none_match: Option<&str>, + ) -> Result> { + let mut req = self.s3_get_object_request(path, range, None, None, if_none_match)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub fn s3_put_object_request( + &self, + path: &str, + size: Option, + content_type: Option<&str>, + content_disposition: Option<&str>, + cache_control: Option<&str>, + body: AsyncBody, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); + + let mut req = Request::put(&url); + + if let Some(size) = size { + req = req.header(CONTENT_LENGTH, size) + } + + if let Some(mime) = content_type { + req = req.header(CONTENT_TYPE, mime) + } + + if let Some(pos) = content_disposition { + req = req.header(CONTENT_DISPOSITION, pos) + } + + if let Some(cache_control) = cache_control { + req = req.header(CACHE_CONTROL, cache_control) + } + + // Set storage class header + if let Some(v) = &self.default_storage_class { + req = req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v); + } + + // Set SSE headers. + req = self.insert_sse_headers(req, true); + + // Set body + let req = req.body(body).map_err(new_request_build_error)?; + + Ok(req) + } + + pub async fn s3_head_object( + &self, + path: &str, + if_none_match: Option<&str>, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); + + let mut req = Request::head(&url); + + // Set SSE headers. + req = self.insert_sse_headers(req, false); + + if let Some(if_none_match) = if_none_match { + req = req.header(http::header::IF_NONE_MATCH, if_none_match); + } + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn s3_delete_object(&self, path: &str) -> Result> { + let p = build_abs_path(&self.root, path); + + let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); + + let mut req = Request::delete(&url) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn s3_copy_object( + &self, + from: &str, + to: &str, + ) -> Result> { + let from = build_abs_path(&self.root, from); + let to = build_abs_path(&self.root, to); + + let source = format!("{}/{}", self.bucket, percent_encode_path(&from)); + let target = format!("{}/{}", self.endpoint, percent_encode_path(&to)); + + let mut req = Request::put(&target); + + // Set SSE headers. + req = self.insert_sse_headers(req, true); + + if let Some(v) = &self.server_side_encryption_customer_algorithm { + let mut v = v.clone(); + v.set_sensitive(true); + + req = req.header( + HeaderName::from_static( + constants::X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM, + ), + v, + ) + } + + if let Some(v) = &self.server_side_encryption_customer_key { + let mut v = v.clone(); + v.set_sensitive(true); + + req = req.header( + HeaderName::from_static( + constants::X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY, + ), + v, + ) + } + + if let Some(v) = &self.server_side_encryption_customer_key_md5 { + let mut v = v.clone(); + v.set_sensitive(true); + + req = req.header( + HeaderName::from_static( + constants::X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5, + ), + v, + ) + } + + let mut req = req + .header(constants::X_AMZ_COPY_SOURCE, percent_encode_path(&source)) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + /// Make this functions as `pub(suber)` because `DirStream` depends + /// on this. + pub async fn s3_list_objects( + &self, + path: &str, + continuation_token: &str, + delimiter: &str, + limit: Option, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let mut url = format!( + "{}?list-type=2&delimiter={delimiter}&prefix={}", + self.endpoint, + percent_encode_path(&p) + ); + if let Some(limit) = limit { + write!(url, "&max-keys={limit}").expect("write into string must succeed"); + } + if !continuation_token.is_empty() { + // AWS S3 could return continuation-token that contains `=` + // which could lead `reqsign` parse query wrongly. + // URL encode continuation-token before starting signing so that + // our signer will not be confused. + write!( + url, + "&continuation-token={}", + percent_encode_path(continuation_token) + ) + .expect("write into string must succeed"); + } + + let mut req = Request::get(&url) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn s3_initiate_multipart_upload( + &self, + path: &str, + content_type: Option<&str>, + content_disposition: Option<&str>, + cache_control: Option<&str>, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let url = format!("{}/{}?uploads", self.endpoint, percent_encode_path(&p)); + + let mut req = Request::post(&url); + + if let Some(mime) = content_type { + req = req.header(CONTENT_TYPE, mime) + } + + if let Some(content_disposition) = content_disposition { + req = req.header(CONTENT_DISPOSITION, content_disposition) + } + + if let Some(cache_control) = cache_control { + req = req.header(CACHE_CONTROL, cache_control) + } + + // Set storage class header + if let Some(v) = &self.default_storage_class { + req = req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v); + } + + // Set SSE headers. + let req = self.insert_sse_headers(req, true); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub fn s3_upload_part_request( + &self, + path: &str, + upload_id: &str, + part_number: usize, + size: Option, + body: AsyncBody, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "{}/{}?partNumber={}&uploadId={}", + self.endpoint, + percent_encode_path(&p), + part_number, + percent_encode_path(upload_id) + ); + + let mut req = Request::put(&url); + + if let Some(size) = size { + req = req.header(CONTENT_LENGTH, size); + } + + // Set SSE headers. + req = self.insert_sse_headers(req, true); + + // Set body + let req = req.body(body).map_err(new_request_build_error)?; + + Ok(req) + } + + pub async fn s3_complete_multipart_upload( + &self, + path: &str, + upload_id: &str, + parts: &[CompleteMultipartUploadRequestPart], + ) -> Result> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "{}/{}?uploadId={}", + self.endpoint, + percent_encode_path(&p), + percent_encode_path(upload_id) + ); + + let req = Request::post(&url); + + // Set SSE headers. + let req = self.insert_sse_headers(req, true); + + let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest { + part: parts.to_vec(), + }) + .map_err(new_xml_deserialize_error)?; + // Make sure content length has been set to avoid post with chunked encoding. + let req = req.header(CONTENT_LENGTH, content.len()); + // Set content-type to `application/xml` to avoid mixed with form post. + let req = req.header(CONTENT_TYPE, "application/xml"); + + let mut req = req + .body(AsyncBody::Bytes(Bytes::from(content))) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn s3_delete_objects( + &self, + paths: Vec, + ) -> Result> { + let url = format!("{}/?delete", self.endpoint); + + let req = Request::post(&url); + + let content = quick_xml::se::to_string(&DeleteObjectsRequest { + object: paths + .into_iter() + .map(|path| DeleteObjectsRequestObject { + key: build_abs_path(&self.root, &path), + }) + .collect(), + }) + .map_err(new_xml_deserialize_error)?; + + // Make sure content length has been set to avoid post with chunked encoding. + let req = req.header(CONTENT_LENGTH, content.len()); + // Set content-type to `application/xml` to avoid mixed with form post. + let req = req.header(CONTENT_TYPE, "application/xml"); + // Set content-md5 as required by API. + let req = req.header("CONTENT-MD5", format_content_md5(content.as_bytes())); + + let mut req = req + .body(AsyncBody::Bytes(Bytes::from(content))) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } +} + +/// Result of CreateMultipartUpload +#[derive(Default, Debug, Deserialize)] +#[serde(default, rename_all = "PascalCase")] +pub struct InitiateMultipartUploadResult { + pub upload_id: String, +} + +/// Request of CompleteMultipartUploadRequest +#[derive(Default, Debug, Serialize)] +#[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")] +pub struct CompleteMultipartUploadRequest { + pub part: Vec, +} + +#[derive(Clone, Default, Debug, Serialize)] +#[serde(default, rename_all = "PascalCase")] +pub struct CompleteMultipartUploadRequestPart { + #[serde(rename = "PartNumber")] + pub part_number: usize, + /// # TODO + /// + /// quick-xml will do escape on `"` which leads to our serialized output is + /// not the same as aws s3's example. + /// + /// Ideally, we could use `serialize_with` to address this (buf failed) + /// + /// ```ignore + /// #[derive(Default, Debug, Serialize)] + /// #[serde(default, rename_all = "PascalCase")] + /// struct CompleteMultipartUploadRequestPart { + /// #[serde(rename = "PartNumber")] + /// part_number: usize, + /// #[serde(rename = "ETag", serialize_with = "partial_escape")] + /// etag: String, + /// } + /// + /// fn partial_escape(s: &str, ser: S) -> std::result::Result + /// where + /// S: serde::Serializer, + /// { + /// ser.serialize_str(&String::from_utf8_lossy( + /// &quick_xml::escape::partial_escape(s.as_bytes()), + /// )) + /// } + /// ``` + /// + /// ref: + #[serde(rename = "ETag")] + pub etag: String, +} + +/// Request of DeleteObjects. +#[derive(Default, Debug, Serialize)] +#[serde(default, rename = "Delete", rename_all = "PascalCase")] +pub struct DeleteObjectsRequest { + pub object: Vec, +} + +#[derive(Default, Debug, Serialize)] +#[serde(rename_all = "PascalCase")] +pub struct DeleteObjectsRequestObject { + pub key: String, +} + +/// Result of DeleteObjects. +#[derive(Default, Debug, Deserialize)] +#[serde(default, rename = "DeleteResult", rename_all = "PascalCase")] +pub struct DeleteObjectsResult { + pub deleted: Vec, + pub error: Vec, +} + +#[derive(Default, Debug, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct DeleteObjectsResultDeleted { + pub key: String, +} + +#[derive(Default, Debug, Deserialize)] +#[serde(default, rename_all = "PascalCase")] +pub struct DeleteObjectsResultError { + pub code: String, + pub key: String, + pub message: String, +} + +#[cfg(test)] +mod tests { + use bytes::Buf; + use bytes::Bytes; + + use super::*; + + /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateMultipartUpload.html#API_CreateMultipartUpload_Examples + #[test] + fn test_deserialize_initiate_multipart_upload_result() { + let bs = Bytes::from( + r#" + + example-bucket + example-object + VXBsb2FkIElEIGZvciA2aWWpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZA + "#, + ); + + let out: InitiateMultipartUploadResult = + quick_xml::de::from_reader(bs.reader()).expect("must success"); + + assert_eq!( + out.upload_id, + "VXBsb2FkIElEIGZvciA2aWWpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZA" + ) + } + + /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html#API_CompleteMultipartUpload_Examples + #[test] + fn test_serialize_complete_multipart_upload_request() { + let req = CompleteMultipartUploadRequest { + part: vec![ + CompleteMultipartUploadRequestPart { + part_number: 1, + etag: "\"a54357aff0632cce46d942af68356b38\"".to_string(), + }, + CompleteMultipartUploadRequestPart { + part_number: 2, + etag: "\"0c78aef83f66abc1fa1e8477f296d394\"".to_string(), + }, + CompleteMultipartUploadRequestPart { + part_number: 3, + etag: "\"acbd18db4cc2f85cedef654fccc4a4d8\"".to_string(), + }, + ], + }; + + let actual = quick_xml::se::to_string(&req).expect("must succeed"); + + pretty_assertions::assert_eq!( + actual, + r#" + + 1 + "a54357aff0632cce46d942af68356b38" + + + 2 + "0c78aef83f66abc1fa1e8477f296d394" + + + 3 + "acbd18db4cc2f85cedef654fccc4a4d8" + + "# + // Cleanup space and new line + .replace([' ', '\n'], "") + // Escape `"` by hand to address + .replace('"', """) + ) + } + + /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_Examples + #[test] + fn test_serialize_delete_objects_request() { + let req = DeleteObjectsRequest { + object: vec![ + DeleteObjectsRequestObject { + key: "sample1.txt".to_string(), + }, + DeleteObjectsRequestObject { + key: "sample2.txt".to_string(), + }, + ], + }; + + let actual = quick_xml::se::to_string(&req).expect("must succeed"); + + pretty_assertions::assert_eq!( + actual, + r#" + + sample1.txt + + + sample2.txt + + "# + // Cleanup space and new line + .replace([' ', '\n'], "") + ) + } + + /// This example is from https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_Examples + #[test] + fn test_deserialize_delete_objects_result() { + let bs = Bytes::from( + r#" + + + sample1.txt + + + sample2.txt + AccessDenied + Access Denied + + "#, + ); + + let out: DeleteObjectsResult = + quick_xml::de::from_reader(bs.reader()).expect("must success"); + + assert_eq!(out.deleted.len(), 1); + assert_eq!(out.deleted[0].key, "sample1.txt"); + assert_eq!(out.error.len(), 1); + assert_eq!(out.error[0].key, "sample2.txt"); + assert_eq!(out.error[0].code, "AccessDenied"); + assert_eq!(out.error[0].message, "Access Denied"); + } +} diff --git a/core/src/services/s3/mod.rs b/core/src/services/s3/mod.rs index 1a4653354d1b..876dfacb4ebb 100644 --- a/core/src/services/s3/mod.rs +++ b/core/src/services/s3/mod.rs @@ -18,6 +18,7 @@ mod backend; pub use backend::S3Builder as S3; +mod core; mod error; mod pager; mod writer; diff --git a/core/src/services/s3/pager.rs b/core/src/services/s3/pager.rs index 0d58bd06c0b9..e7036134206a 100644 --- a/core/src/services/s3/pager.rs +++ b/core/src/services/s3/pager.rs @@ -24,7 +24,7 @@ use serde::Deserialize; use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; -use super::backend::S3Backend; +use super::core::S3Core; use super::error::parse_error; use crate::raw::*; use crate::EntryMode; @@ -34,8 +34,8 @@ use crate::Metadata; use crate::Result; pub struct S3Pager { - backend: Arc, - root: String, + core: Arc, + path: String, delimiter: String, limit: Option, @@ -45,16 +45,10 @@ pub struct S3Pager { } impl S3Pager { - pub fn new( - backend: Arc, - root: &str, - path: &str, - delimiter: &str, - limit: Option, - ) -> Self { + pub fn new(core: Arc, path: &str, delimiter: &str, limit: Option) -> Self { Self { - backend, - root: root.to_string(), + core, + path: path.to_string(), delimiter: delimiter.to_string(), limit, @@ -73,7 +67,7 @@ impl oio::Page for S3Pager { } let resp = self - .backend + .core .s3_list_objects(&self.path, &self.token, &self.delimiter, self.limit) .await?; @@ -103,7 +97,7 @@ impl oio::Page for S3Pager { for prefix in output.common_prefixes { let de = oio::Entry::new( - &build_rel_path(&self.root, &prefix.prefix), + &build_rel_path(&self.core.root, &prefix.prefix), Metadata::new(EntryMode::DIR), ); @@ -140,7 +134,7 @@ impl oio::Page for S3Pager { })?; meta.set_last_modified(dt); - let de = oio::Entry::new(&build_rel_path(&self.root, &object.key), meta); + let de = oio::Entry::new(&build_rel_path(&self.core.root, &object.key), meta); entries.push(de); } diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs index 8114f43d07e6..4609cb2a0ce0 100644 --- a/core/src/services/s3/writer.rs +++ b/core/src/services/s3/writer.rs @@ -15,19 +15,20 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use async_trait::async_trait; use bytes::Bytes; use http::StatusCode; -use super::backend::CompleteMultipartUploadRequestPart; -use super::backend::S3Backend; +use super::core::*; use super::error::parse_error; use crate::ops::OpWrite; use crate::raw::*; use crate::*; pub struct S3Writer { - backend: S3Backend, + core: Arc, op: OpWrite, path: String, @@ -37,9 +38,10 @@ pub struct S3Writer { } impl S3Writer { - pub fn new(backend: S3Backend, op: OpWrite, path: String, upload_id: Option) -> Self { + pub fn new(core: Arc, op: OpWrite, path: String, upload_id: Option) -> Self { S3Writer { - backend, + core, + op, path, upload_id, @@ -56,7 +58,7 @@ impl oio::Write for S3Writer { "Writer initiated with upload id, but users trying to call write, must be buggy" ); - let mut req = self.backend.s3_put_object_request( + let mut req = self.core.s3_put_object_request( &self.path, Some(bs.len()), self.op.content_type(), @@ -65,12 +67,9 @@ impl oio::Write for S3Writer { AsyncBody::Bytes(bs), )?; - self.backend - .signer - .sign(&mut req) - .map_err(new_request_sign_error)?; + self.core.sign(&mut req).await?; - let resp = self.backend.client.send(req).await?; + let resp = self.core.send(req).await?; let status = resp.status(); @@ -90,7 +89,7 @@ impl oio::Write for S3Writer { // AWS S3 requires part number must between [1..=10000] let part_number = self.parts.len() + 1; - let mut req = self.backend.s3_upload_part_request( + let mut req = self.core.s3_upload_part_request( &self.path, upload_id, part_number, @@ -98,12 +97,9 @@ impl oio::Write for S3Writer { AsyncBody::Bytes(bs), )?; - self.backend - .signer - .sign(&mut req) - .map_err(new_request_sign_error)?; + self.core.sign(&mut req).await?; - let resp = self.backend.client.send(req).await?; + let resp = self.core.send(req).await?; let status = resp.status(); @@ -137,7 +133,7 @@ impl oio::Write for S3Writer { }; let resp = self - .backend + .core .s3_complete_multipart_upload(&self.path, upload_id, &self.parts) .await?; diff --git a/core/src/types/operator/operator.rs b/core/src/types/operator/operator.rs index 2b1526e2e929..d1069b8f120e 100644 --- a/core/src/types/operator/operator.rs +++ b/core/src/types/operator/operator.rs @@ -16,6 +16,7 @@ // under the License. use std::ops::RangeBounds; +use std::time::Duration; use bytes::Bytes; use flagset::FlagSet; @@ -24,7 +25,6 @@ use futures::AsyncReadExt; use futures::Stream; use futures::StreamExt; use futures::TryStreamExt; -use time::Duration; use tokio::io::ReadBuf; use super::BlockingOperator; diff --git a/core/src/types/ops.rs b/core/src/types/ops.rs index c2818124d605..84089e4489ac 100644 --- a/core/src/types/ops.rs +++ b/core/src/types/ops.rs @@ -19,7 +19,7 @@ //! //! By using ops, users can add more context for operation. -use time::Duration; +use std::time::Duration; use crate::raw::*; use crate::*; diff --git a/core/tests/behavior/presign.rs b/core/tests/behavior/presign.rs index 188d781cc79a..8e42f902930e 100644 --- a/core/tests/behavior/presign.rs +++ b/core/tests/behavior/presign.rs @@ -16,6 +16,7 @@ // under the License. use std::str::FromStr; +use std::time::Duration; use anyhow::Result; use http::header; @@ -25,7 +26,6 @@ use opendal::Operator; use reqwest::Url; use sha2::Digest; use sha2::Sha256; -use time::Duration; use super::utils::*; @@ -84,7 +84,7 @@ pub async fn test_presign_write(op: Operator) -> Result<()> { debug!("Generate a random file: {}", &path); let (content, size) = gen_bytes(); - let signed_req = op.presign_write(&path, Duration::hours(1)).await?; + let signed_req = op.presign_write(&path, Duration::from_secs(3600)).await?; debug!("Generated request: {signed_req:?}"); let client = reqwest::Client::new(); @@ -118,7 +118,7 @@ pub async fn test_presign_stat(op: Operator) -> Result<()> { op.write(&path, content.clone()) .await .expect("write must succeed"); - let signed_req = op.presign_stat(&path, Duration::hours(1)).await?; + let signed_req = op.presign_stat(&path, Duration::from_secs(3600)).await?; debug!("Generated request: {signed_req:?}"); let client = reqwest::Client::new(); let mut req = client.request( @@ -150,7 +150,7 @@ pub async fn test_presign_read(op: Operator) -> Result<()> { .await .expect("write must succeed"); - let signed_req = op.presign_read(&path, Duration::hours(1)).await?; + let signed_req = op.presign_read(&path, Duration::from_secs(3600)).await?; debug!("Generated request: {signed_req:?}"); let client = reqwest::Client::new();