Skip to content

Commit

Permalink
feat(core): Implement write if not exists for azblob,azdls,gcs,oss,cos (
Browse files Browse the repository at this point in the history
  • Loading branch information
Xuanwo authored Nov 14, 2024
1 parent 8eeec44 commit 084892f
Show file tree
Hide file tree
Showing 15 changed files with 93 additions and 25 deletions.
2 changes: 2 additions & 0 deletions core/src/services/azblob/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,8 @@ impl Access for AzblobBackend {
write_can_multi: true,
write_with_cache_control: true,
write_with_content_type: true,
write_with_if_not_exists: true,
write_with_if_none_match: true,
write_with_user_metadata: true,

delete: true,
Expand Down
42 changes: 26 additions & 16 deletions core/src/services/azblob/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,18 +245,10 @@ impl AzblobCore {

let mut req = Request::put(&url);

if let Some(user_metadata) = args.user_metadata() {
for (key, value) in user_metadata {
req = req.header(format!("{X_MS_META_PREFIX}{key}"), value)
}
}

// Set SSE headers.
req = self.insert_sse_headers(req);

if let Some(cache_control) = args.cache_control() {
req = req.header(constants::X_MS_BLOB_CACHE_CONTROL, cache_control);
}
req = req.header(
HeaderName::from_static(constants::X_MS_BLOB_TYPE),
"BlockBlob",
);

if let Some(size) = size {
req = req.header(CONTENT_LENGTH, size)
Expand All @@ -266,10 +258,28 @@ impl AzblobCore {
req = req.header(CONTENT_TYPE, ty)
}

req = req.header(
HeaderName::from_static(constants::X_MS_BLOB_TYPE),
"BlockBlob",
);
// Specify the wildcard character (*) to perform the operation only if
// the resource does not exist, and fail the operation if it does exist.
if args.if_not_exists() {
req = req.header(IF_NONE_MATCH, "*");
}

if let Some(v) = args.if_none_match() {
req = req.header(IF_NONE_MATCH, v);
}

if let Some(cache_control) = args.cache_control() {
req = req.header(constants::X_MS_BLOB_CACHE_CONTROL, cache_control);
}

// Set SSE headers.
req = self.insert_sse_headers(req);

if let Some(user_metadata) = args.user_metadata() {
for (key, value) in user_metadata {
req = req.header(format!("{X_MS_META_PREFIX}{key}"), value)
}
}

// Set body
let req = req.body(body).map_err(new_request_build_error)?;
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/azblob/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub(super) fn parse_error(resp: Response<Buffer>) -> Error {
let (kind, retryable) = match parts.status {
StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false),
StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED => {
StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED | StatusCode::CONFLICT => {
(ErrorKind::ConditionNotMatch, false)
}
StatusCode::INTERNAL_SERVER_ERROR
Expand Down
3 changes: 3 additions & 0 deletions core/src/services/azdls/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ impl Access for AzdlsBackend {

write: true,
write_can_append: true,
write_with_if_none_match: true,
write_with_if_not_exists: true,

create_dir: true,
delete: true,
rename: true,
Expand Down
10 changes: 9 additions & 1 deletion core/src/services/azdls/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ use std::fmt::Debug;
use std::fmt::Formatter;
use std::fmt::Write;

use http::header::CONTENT_DISPOSITION;
use http::header::CONTENT_LENGTH;
use http::header::CONTENT_TYPE;
use http::header::{CONTENT_DISPOSITION, IF_NONE_MATCH};
use http::HeaderName;
use http::HeaderValue;
use http::Request;
Expand Down Expand Up @@ -153,6 +153,14 @@ impl AzdlsCore {
req = req.header(CONTENT_DISPOSITION, pos)
}

if args.if_not_exists() {
req = req.header(IF_NONE_MATCH, "*")
}

if let Some(v) = args.if_none_match() {
req = req.header(IF_NONE_MATCH, v)
}

// Set body
let req = req.body(body).map_err(new_request_build_error)?;

Expand Down
4 changes: 3 additions & 1 deletion core/src/services/azdls/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ pub(super) fn parse_error(resp: Response<Buffer>) -> Error {
let (kind, retryable) = match parts.status {
StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false),
StatusCode::PRECONDITION_FAILED => (ErrorKind::ConditionNotMatch, false),
StatusCode::PRECONDITION_FAILED | StatusCode::CONFLICT => {
(ErrorKind::ConditionNotMatch, false)
}
StatusCode::INTERNAL_SERVER_ERROR
| StatusCode::BAD_GATEWAY
| StatusCode::SERVICE_UNAVAILABLE
Expand Down
2 changes: 2 additions & 0 deletions core/src/services/cos/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ impl Access for CosBackend {
write_with_content_type: true,
write_with_cache_control: true,
write_with_content_disposition: true,
// TODO: set this to false while version has been enabled.
write_with_if_not_exists: true,
// The min multipart size of COS is 1 MiB.
//
// ref: <https://www.tencentcloud.com/document/product/436/14112>
Expand Down
12 changes: 12 additions & 0 deletions core/src/services/cos/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,18 @@ impl CosCore {
req = req.header(CONTENT_TYPE, mime)
}

// For a bucket which has never enabled versioning, you may use it to
// specify whether to prohibit overwriting the object with the same name
// when uploading the object:
//
// When the x-cos-forbid-overwrite is specified as true, overwriting the object
// with the same name will be prohibited.
//
// ref: https://www.tencentcloud.com/document/product/436/7749
if args.if_not_exists() {
req = req.header("x-cos-forbid-overwrite", "true")
}

let req = req.body(body).map_err(new_request_build_error)?;

Ok(req)
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/cos/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub(super) fn parse_error(resp: Response<Buffer>) -> Error {
let (kind, retryable) = match parts.status {
StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false),
StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED => {
StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED | StatusCode::CONFLICT => {
(ErrorKind::ConditionNotMatch, false)
}
StatusCode::INTERNAL_SERVER_ERROR
Expand Down
2 changes: 2 additions & 0 deletions core/src/services/gcs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ impl Access for GcsBackend {
write_can_multi: true,
write_with_content_type: true,
write_with_user_metadata: true,
write_with_if_not_exists: true,

// The min multipart size of Gcs is 5 MiB.
//
// ref: <https://cloud.google.com/storage/docs/xml-api/put-object-multipart>
Expand Down
7 changes: 7 additions & 0 deletions core/src/services/gcs/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,13 @@ impl GcsCore {
write!(&mut url, "&predefinedAcl={}", acl).unwrap();
}

// Makes the operation conditional on whether the object's current generation
// matches the given value. Setting to 0 makes the operation succeed only if
// there are no live versions of the object.
if op.if_not_exists() {
write!(&mut url, "&ifGenerationMatch=0").unwrap();
}

let mut req = Request::post(&url);

req = req.header(CONTENT_LENGTH, size.unwrap_or_default());
Expand Down
3 changes: 3 additions & 0 deletions core/src/services/oss/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,9 @@ impl Access for OssBackend {
write_with_cache_control: true,
write_with_content_type: true,
write_with_content_disposition: true,
// TODO: set this to false while version has been enabled.
write_with_if_not_exists: true,

// The min multipart size of OSS is 100 KiB.
//
// ref: <https://www.alibabacloud.com/help/en/oss/user-guide/multipart-upload-12>
Expand Down
17 changes: 17 additions & 0 deletions core/src/services/oss/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,16 @@ use serde::Deserialize;
use serde::Serialize;

use crate::raw::*;
use crate::services::oss::core::constants::X_OSS_FORBID_OVERWRITE;
use crate::*;

pub mod constants {
pub const X_OSS_SERVER_SIDE_ENCRYPTION: &str = "x-oss-server-side-encryption";

pub const X_OSS_SERVER_SIDE_ENCRYPTION_KEY_ID: &str = "x-oss-server-side-encryption-key-id";

pub const X_OSS_FORBID_OVERWRITE: &str = "x-oss-forbid-overwrite";

pub const RESPONSE_CONTENT_DISPOSITION: &str = "response-content-disposition";

pub const OSS_QUERY_VERSION_ID: &str = "versionId";
Expand Down Expand Up @@ -181,6 +184,20 @@ impl OssCore {
req = req.header(CACHE_CONTROL, cache_control);
}

// TODO: disable if not exists while version has been enabled.
//
// Specifies whether the object that is uploaded by calling the PutObject operation
// overwrites the existing object that has the same name. When versioning is enabled
// or suspended for the bucket to which you want to upload the object, the
// x-oss-forbid-overwrite header does not take effect. In this case, the object that
// is uploaded by calling the PutObject operation overwrites the existing object that
// has the same name.
//
// ref: https://www.alibabacloud.com/help/en/oss/developer-reference/putobject?spm=a2c63.p38356.0.0.39ef75e93o0Xtz
if args.if_not_exists() {
req = req.header(X_OSS_FORBID_OVERWRITE, "true");
}

if let Some(user_metadata) = args.user_metadata() {
for (key, value) in user_metadata {
// before insert user defined metadata header, add prefix to the header name
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/oss/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub(super) fn parse_error(resp: Response<Buffer>) -> Error {
let (kind, retryable) = match parts.status {
StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false),
StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED => {
StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED | StatusCode::CONFLICT => {
(ErrorKind::ConditionNotMatch, false)
}
StatusCode::INTERNAL_SERVER_ERROR
Expand Down
8 changes: 4 additions & 4 deletions core/src/services/s3/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,10 @@ impl S3Core {
req = req.header(CACHE_CONTROL, cache_control)
}

if args.if_not_exists() {
req = req.header(IF_NONE_MATCH, "*");
}

// Set storage class header
if let Some(v) = &self.default_storage_class {
req = req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v);
Expand All @@ -476,10 +480,6 @@ impl S3Core {
req = self.insert_checksum_header(req, &checksum);
}

if args.if_not_exists() {
req = req.header(IF_NONE_MATCH, "*");
}

// Set body
let req = req.body(body).map_err(new_request_build_error)?;

Expand Down

0 comments on commit 084892f

Please sign in to comment.