Skip to content

Commit

Permalink
feat(services/s3): add append support
Browse files Browse the repository at this point in the history
  • Loading branch information
Frank-III committed Dec 18, 2024
1 parent a9d0ec6 commit b9e78df
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 41 deletions.
20 changes: 16 additions & 4 deletions core/src/services/s3/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,12 @@ impl Access for S3Backend {
write: true,
write_can_empty: true,
write_can_multi: true,
// Only S3 Express One Zone storage class supports append.
write_can_append: self
.core
.default_storage_class
.as_ref()
.is_some_and(|v| v == "EXPRESS_ONEZONE"),
write_with_cache_control: true,
write_with_content_type: true,
write_with_content_encoding: true,
Expand Down Expand Up @@ -1029,11 +1035,17 @@ impl Access for S3Backend {
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let concurrent = args.concurrent();
let executor = args.executor().cloned();
let writer = S3Writer::new(self.core.clone(), path, args);
let writer = S3Writer::new(self.core.clone(), path, args.clone());

let w = oio::MultipartWriter::new(writer, executor, concurrent);
let w = if args.append() {
S3Writers::Two(oio::AppendWriter::new(writer))
} else {
S3Writers::One(oio::MultipartWriter::new(
writer,
args.executor().cloned(),
args.concurrent(),
))
};

Ok((RpWrite::default(), w))
}
Expand Down
1 change: 1 addition & 0 deletions core/src/services/s3/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ pub struct S3Config {
/// - `GLACIER_IR`
/// - `INTELLIGENT_TIERING`
/// - `ONEZONE_IA`
/// - `EXPRESS_ONEZONE`
/// - `OUTPOSTS`
/// - `REDUCED_REDUNDANCY`
/// - `STANDARD`
Expand Down
111 changes: 75 additions & 36 deletions core/src/services/s3/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ pub mod constants {
pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: &str =
"x-amz-copy-source-server-side-encryption-customer-key-md5";

pub const X_AMZ_WRITE_OFFSET_BYTES: &str = "x-amz-write-offset-bytes";

pub const X_AMZ_META_PREFIX: &str = "x-amz-meta-";

pub const RESPONSE_CONTENT_DISPOSITION: &str = "response-content-disposition";
Expand Down Expand Up @@ -289,6 +291,54 @@ impl S3Core {
}
req
}

pub fn insert_metadata_headers(
&self,
mut req: http::request::Builder,
size: Option<u64>,
args: &OpWrite,
) -> http::request::Builder {
if let Some(size) = size {
req = req.header(CONTENT_LENGTH, size.to_string())
}

if let Some(mime) = args.content_type() {
req = req.header(CONTENT_TYPE, mime)
}

if let Some(pos) = args.content_disposition() {
req = req.header(CONTENT_DISPOSITION, pos)
}

if let Some(encoding) = args.content_encoding() {
req = req.header(CONTENT_ENCODING, encoding);
}

if let Some(cache_control) = args.cache_control() {
req = req.header(CACHE_CONTROL, cache_control)
}

if let Some(if_match) = args.if_match() {
req = req.header(IF_MATCH, if_match);
}

if args.if_not_exists() {
req = req.header(IF_NONE_MATCH, "*");
}

// Set storage class header
if let Some(v) = &self.default_storage_class {
req = req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v);
}

// Set user metadata headers.
if let Some(user_metadata) = args.user_metadata() {
for (key, value) in user_metadata {
req = req.header(format!("{X_AMZ_META_PREFIX}{key}"), value)
}
}
req
}
}

impl S3Core {
Expand Down Expand Up @@ -441,55 +491,44 @@ impl S3Core {

let mut req = Request::put(&url);

if let Some(size) = size {
req = req.header(CONTENT_LENGTH, size.to_string())
}
req = self.insert_metadata_headers(req, size, args);

if let Some(mime) = args.content_type() {
req = req.header(CONTENT_TYPE, mime)
}
// Set SSE headers.
req = self.insert_sse_headers(req, true);

if let Some(pos) = args.content_disposition() {
req = req.header(CONTENT_DISPOSITION, pos)
// Calculate Checksum.
if let Some(checksum) = self.calculate_checksum(&body) {
// Set Checksum header.
req = self.insert_checksum_header(req, &checksum);
}

if let Some(encoding) = args.content_encoding() {
req = req.header(CONTENT_ENCODING, encoding);
}
// Set body
let req = req.body(body).map_err(new_request_build_error)?;

if let Some(cache_control) = args.cache_control() {
req = req.header(CACHE_CONTROL, cache_control)
}
Ok(req)
}

if let Some(if_match) = args.if_match() {
req = req.header(IF_MATCH, if_match);
}
pub async fn s3_append_object_request(
&self,
path: &str,
position: u64,
size: u64,
args: &OpWrite,
body: Buffer,
) -> Result<Request<Buffer>> {
let p = build_abs_path(&self.root, path);

if args.if_not_exists() {
req = req.header(IF_NONE_MATCH, "*");
}
let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));

// Set storage class header
if let Some(v) = &self.default_storage_class {
req = req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v);
}
let mut req = Request::put(&url);

// Set user metadata headers.
if let Some(user_metadata) = args.user_metadata() {
for (key, value) in user_metadata {
req = req.header(format!("{X_AMZ_META_PREFIX}{key}"), value)
}
}
req = self.insert_metadata_headers(req, Some(size), args);

req = req.header(constants::X_AMZ_WRITE_OFFSET_BYTES, position.to_string());

// Set SSE headers.
req = self.insert_sse_headers(req, true);

// Calculate Checksum.
if let Some(checksum) = self.calculate_checksum(&body) {
// Set Checksum header.
req = self.insert_checksum_header(req, &checksum);
}

// Set body
let req = req.body(body).map_err(new_request_build_error)?;

Expand Down
1 change: 1 addition & 0 deletions core/src/services/s3/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ This service can be used to:
- [x] stat
- [x] read
- [x] write
- [x] append
- [x] create_dir
- [x] delete
- [x] copy
Expand Down
36 changes: 35 additions & 1 deletion core/src/services/s3/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use super::error::S3Error;
use crate::raw::*;
use crate::*;

pub type S3Writers = oio::MultipartWriter<S3Writer>;
pub type S3Writers = TwoWays<oio::MultipartWriter<S3Writer>, oio::AppendWriter<S3Writer>>;

pub struct S3Writer {
core: Arc<S3Core>,
Expand Down Expand Up @@ -188,3 +188,37 @@ impl oio::MultipartWrite for S3Writer {
}
}
}

impl oio::AppendWrite for S3Writer {
async fn offset(&self) -> Result<u64> {
let resp = self
.core
.s3_head_object(&self.path, OpStat::default())
.await?;

let status = resp.status();

match status {
StatusCode::OK => Ok(parse_content_length(resp.headers())?.unwrap_or_default()),
StatusCode::NOT_FOUND => Ok(0),
_ => Err(parse_error(resp)),
}
}

async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<()> {
let mut req = self
.core
.s3_append_object_request(&self.path, offset, size, &self.op, body)?;

self.core.sign(&mut req).await?;

let resp = self.core.send(req).await?;

let status = resp.status();

match status {
StatusCode::OK => Ok(()),
_ => Err(parse_error(resp)),
}
}
}

0 comments on commit b9e78df

Please sign in to comment.