From 9543da2d0cd7dea9f42b1a3191f841890336db65 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sun, 7 May 2023 00:35:11 +0800 Subject: [PATCH] feat(oay): Add basic s3 list_objects_v2 with start_after support (#2219) * feat(oay): Add list objects v2 support Signed-off-by: Xuanwo * Update Signed-off-by: Xuanwo * Return error if list with start_after is not supported Signed-off-by: Xuanwo --------- Signed-off-by: Xuanwo --- Cargo.lock | 129 +++++++++++++++- bin/oay/.gitignore | 1 + bin/oay/Cargo.toml | 10 +- bin/oay/README.md | 4 + bin/oay/oay.toml.example | 9 ++ bin/oay/src/bin/oay.rs | 29 +++- bin/oay/src/config.rs | 42 +++++ bin/oay/{ => src}/lib.rs | 5 + bin/oay/src/services/mod.rs | 19 +++ bin/oay/src/services/s3/mod.rs | 19 +++ bin/oay/src/services/s3/service.rs | 240 +++++++++++++++++++++++++++++ core/src/types/list.rs | 30 ++++ 12 files changed, 532 insertions(+), 5 deletions(-) create mode 100644 bin/oay/.gitignore create mode 100644 bin/oay/oay.toml.example create mode 100644 bin/oay/src/config.rs rename bin/oay/{ => src}/lib.rs (94%) create mode 100644 bin/oay/src/services/mod.rs create mode 100644 bin/oay/src/services/s3/mod.rs create mode 100644 bin/oay/src/services/s3/service.rs diff --git a/Cargo.lock b/Cargo.lock index 9ed71f033438..29253a125e5f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -334,6 +334,55 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5b3469636cdf8543cceab175efca534471f36eee12fb8374aba00eb5e7e7f8a" +[[package]] +name = "axum" +version = "0.6.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8175979259124331c1d7bf6586ee7e0da434155e4b2d48ec2c8386281d8df39" +dependencies = [ + "async-trait", + "axum-core", + "bitflags 1.3.2", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "backon" version = "0.4.0" @@ -1579,6 +1628,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-range-header" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" + [[package]] name = "http-types" version = "2.12.0" @@ -2105,6 +2160,12 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" +[[package]] +name = "matchit" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40" + [[package]] name = "md-5" version = "0.10.5" @@ -2430,16 +2491,22 @@ name = "oay" version = "0.33.3" dependencies = [ "anyhow", + "axum", + "chrono", "clap 4.2.5", "dirs 5.0.0", - "env_logger", "futures", - "log", "opendal", + "quick-xml 0.27.1", "serde", "tokio", "toml 0.7.3", + "tower", + "tower-http", + "tracing", + "tracing-subscriber", "url", + "uuid", ] [[package]] @@ -3955,6 +4022,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7f05c1d5476066defcdfacce1f52fc3cae3af1d3089727100c02ae92e5abbe0" +dependencies = [ + "serde", +] + [[package]] name = "serde_qs" version = "0.8.5" @@ -4257,6 +4333,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "tagptr" version = "0.2.0" @@ -4552,6 +4634,47 @@ dependencies = [ "winnow", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-http" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d1d42a9b3f3ec46ba828e8d376aec14592ea199f70a06a548587ecd1c4ab658" +dependencies = [ + "bitflags 1.3.2", + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "http-range-header", + "pin-project-lite", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + [[package]] name = "tower-service" version = "0.3.2" @@ -4565,6 +4688,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if", + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -4787,6 +4911,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1674845326ee10d37ca60470760d4288a6f80f304007d92e5c53bab78c9cfd79" dependencies = [ "getrandom 0.2.8", + "rand 0.8.5", "serde", ] diff --git a/bin/oay/.gitignore b/bin/oay/.gitignore new file mode 100644 index 000000000000..e649572204f7 --- /dev/null +++ b/bin/oay/.gitignore @@ -0,0 +1 @@ +oay.toml diff --git a/bin/oay/Cargo.toml b/bin/oay/Cargo.toml index 3bde725cae2d..4f745ec27bf8 100644 --- a/bin/oay/Cargo.toml +++ b/bin/oay/Cargo.toml @@ -31,12 +31,13 @@ version.workspace = true [dependencies] anyhow = "1" +axum = "0.6" +chrono = "0.4.24" clap = { version = "4", features = ["cargo", "string"] } dirs = "5.0.0" -env_logger = "0.10" futures = "0.3" -log = "0.4" opendal.workspace = true +quick-xml = { version = "0.27", features = ["serialize", "overlapped-lists"] } serde = { version = "1", features = ["derive"] } tokio = { version = "1.27", features = [ "fs", @@ -45,4 +46,9 @@ tokio = { version = "1.27", features = [ "io-std", ] } toml = "0.7.3" +tower = "0.4" +tower-http = { version = "0.4", features = ["trace"] } +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } url = "2.3.1" +uuid = { version = "1", features = ["v4", "fast-rng"] } diff --git a/bin/oay/README.md b/bin/oay/README.md index caa23e57e101..766fd958a23f 100644 --- a/bin/oay/README.md +++ b/bin/oay/README.md @@ -11,3 +11,7 @@ Allow users to access different storage backend through their preferred APIs. ## Status Our first milestone is to provide S3 APIs. + +### S3 API + +Only `list_object_v2` with `start_after` is supported. diff --git a/bin/oay/oay.toml.example b/bin/oay/oay.toml.example new file mode 100644 index 000000000000..ed478c769788 --- /dev/null +++ b/bin/oay/oay.toml.example @@ -0,0 +1,9 @@ +[backend] +type = "s3" + +access_key_id = "access_key_id" +secret_access_key = "secret_access_key" + +[frontends.s3] +enable = true +addr = "127.0.0.1:2000" diff --git a/bin/oay/src/bin/oay.rs b/bin/oay/src/bin/oay.rs index c9b3b63866ab..b5f4813f64c7 100644 --- a/bin/oay/src/bin/oay.rs +++ b/bin/oay/src/bin/oay.rs @@ -16,10 +16,37 @@ // under the License. use anyhow::Result; +use oay::services::S3Service; +use oay::Config; +use opendal::services::Memory; +use opendal::Operator; +use std::sync::Arc; +use tracing_subscriber::{fmt, prelude::*, EnvFilter}; #[tokio::main] async fn main() -> Result<()> { - println!("Hello, world!"); + tracing_subscriber::registry() + .with(fmt::layer().pretty()) + .with(EnvFilter::from_default_env()) + .init(); + + let cfg: Config = Config { + backend: oay::BackendConfig { + typ: "memory".to_string(), + }, + frontends: oay::FrontendsConfig { + s3: oay::S3Config { + enable: true, + addr: "127.0.0.1:3000".to_string(), + }, + }, + }; + + let op = Operator::new(Memory::default())?.finish(); + + let s3 = S3Service::new(Arc::new(cfg), op); + + s3.serve().await?; Ok(()) } diff --git a/bin/oay/src/config.rs b/bin/oay/src/config.rs new file mode 100644 index 000000000000..64ab51c36a0e --- /dev/null +++ b/bin/oay/src/config.rs @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use serde::Deserialize; +use serde::Serialize; + +#[derive(Serialize, Deserialize)] +pub struct Config { + pub backend: BackendConfig, + pub frontends: FrontendsConfig, +} + +#[derive(Serialize, Deserialize)] +pub struct BackendConfig { + #[serde(rename = "type")] + pub typ: String, +} + +#[derive(Serialize, Deserialize)] +pub struct FrontendsConfig { + pub s3: S3Config, +} + +#[derive(Serialize, Deserialize)] +pub struct S3Config { + pub enable: bool, + pub addr: String, +} diff --git a/bin/oay/lib.rs b/bin/oay/src/lib.rs similarity index 94% rename from bin/oay/lib.rs rename to bin/oay/src/lib.rs index b248758bc120..ba5cf7a9dd36 100644 --- a/bin/oay/lib.rs +++ b/bin/oay/src/lib.rs @@ -14,3 +14,8 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + +pub mod services; + +mod config; +pub use config::*; diff --git a/bin/oay/src/services/mod.rs b/bin/oay/src/services/mod.rs new file mode 100644 index 000000000000..001951bef0b9 --- /dev/null +++ b/bin/oay/src/services/mod.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod s3; +pub use s3::S3Service; diff --git a/bin/oay/src/services/s3/mod.rs b/bin/oay/src/services/s3/mod.rs new file mode 100644 index 000000000000..236d0d18e98f --- /dev/null +++ b/bin/oay/src/services/s3/mod.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod service; +pub use service::*; diff --git a/bin/oay/src/services/s3/service.rs b/bin/oay/src/services/s3/service.rs new file mode 100644 index 000000000000..84b9c7774fac --- /dev/null +++ b/bin/oay/src/services/s3/service.rs @@ -0,0 +1,240 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use axum::extract::Query; +use axum::extract::State; +use axum::http::StatusCode; +use axum::response::IntoResponse; +use axum::response::Response; +use axum::routing::get; +use axum::Router; +use chrono::SecondsFormat; +use opendal::ops::OpList; +use opendal::Metakey; +use opendal::Operator; +use serde::Deserialize; +use serde::Serialize; +use tower::ServiceBuilder; +use tower_http::trace::TraceLayer; +use tracing::debug; + +use crate::Config; + +pub struct S3Service { + cfg: Arc, + op: Operator, +} + +impl S3Service { + pub fn new(cfg: Arc, op: Operator) -> Self { + Self { cfg, op } + } + + pub async fn serve(&self) -> anyhow::Result<()> { + let s3_cfg = &self.cfg.frontends.s3; + + let app = Router::new() + .route("/", get(handle_list_objects)) + .layer(ServiceBuilder::new().layer(TraceLayer::new_for_http())) + .with_state(S3State { + op: self.op.clone(), + }); + + axum::Server::bind(&s3_cfg.addr.parse().unwrap()) + .serve(app.into_make_service()) + .await?; + + Ok(()) + } +} + +#[derive(Clone)] +pub struct S3State { + op: Operator, +} + +/// # TODO +/// +/// we need to support following parameters: +/// +/// - max-keys +/// - continuation_token +#[derive(Deserialize, Default, Debug)] +#[serde(default)] +struct ListObjectsV2Params { + prefix: String, + start_after: String, +} + +async fn handle_list_objects( + state: State, + params: Query, +) -> Result { + debug!("got params: {:?}", params); + + if !state.op.info().capability().list_with_start_after { + return Err(ErrorResponse { + code: StatusCode::NOT_IMPLEMENTED, + err: Error { + code: "NotImplemented".to_string(), + message: "list with start after is not supported".to_string(), + resource: "".to_string(), + request_id: "".to_string(), + }, + }); + } + + let mut lister = state + .op + .list_with( + ¶ms.prefix, + OpList::new().with_start_after(¶ms.start_after), + ) + .await?; + + let page = lister.next_page().await?.unwrap_or_default(); + + let is_truncated = lister.has_next().await?; + + let (mut common_prefixes, mut contents) = (vec![], vec![]); + for v in page { + let meta = state + .op + .metadata( + &v, + Metakey::Mode | Metakey::LastModified | Metakey::Etag | Metakey::ContentLength, + ) + .await?; + + if meta.is_dir() { + common_prefixes.push(CommonPrefix { + prefix: v.path().to_string(), + }); + } else { + contents.push(Object { + key: v.path().to_string(), + last_modified: meta + .last_modified() + .unwrap_or_default() + .to_rfc3339_opts(SecondsFormat::Millis, true), + etag: meta.etag().unwrap_or_default().to_string(), + size: meta.content_length(), + }); + } + } + + let resp = ListBucketResult { + is_truncated, + common_prefixes, + contents, + start_after: Some(params.start_after.clone()), + }; + + Ok(OkResponse { + code: StatusCode::OK, + content: quick_xml::se::to_string(&resp).unwrap().into_bytes(), + }) +} + +#[derive(Serialize, Default)] +#[serde(default, rename_all = "PascalCase")] +struct ListBucketResult { + is_truncated: bool, + common_prefixes: Vec, + contents: Vec, + start_after: Option, +} + +#[derive(Serialize, Default)] +#[serde(default, rename_all = "PascalCase")] +struct CommonPrefix { + prefix: String, +} + +#[derive(Serialize, Default)] +#[serde(default, rename_all = "PascalCase")] +struct Object { + key: String, + last_modified: String, + etag: String, + size: u64, +} + +struct OkResponse { + code: StatusCode, + content: Vec, +} + +impl IntoResponse for OkResponse { + fn into_response(self) -> Response { + (self.code, self.content).into_response() + } +} + +struct ErrorResponse { + code: StatusCode, + err: Error, +} + +impl IntoResponse for ErrorResponse { + fn into_response(self) -> Response { + (self.code, quick_xml::se::to_string(&self.err).unwrap()).into_response() + } +} + +#[derive(Serialize)] +#[serde(default, rename_all = "PascalCase")] +struct Error { + code: String, + message: String, + resource: String, + request_id: String, +} + +impl From for ErrorResponse { + fn from(err: opendal::Error) -> Self { + let err = Error { + code: "InternalError".to_string(), + message: err.to_string(), + resource: "".to_string(), + request_id: "".to_string(), + }; + + ErrorResponse { + code: StatusCode::INTERNAL_SERVER_ERROR, + err, + } + } +} + +impl From for ErrorResponse { + fn from(err: anyhow::Error) -> Self { + let err = Error { + code: "InternalError".to_string(), + message: err.to_string(), + resource: "".to_string(), + request_id: "".to_string(), + }; + + ErrorResponse { + code: StatusCode::INTERNAL_SERVER_ERROR, + err, + } + } +} diff --git a/core/src/types/list.rs b/core/src/types/list.rs index bc27688e4bba..02395063ca19 100644 --- a/core/src/types/list.rs +++ b/core/src/types/list.rs @@ -57,6 +57,36 @@ impl Lister { } } + /// has_next can be used to check if there are more pages. + pub async fn has_next(&mut self) -> Result { + debug_assert!( + self.fut.is_none(), + "there are ongoing futures for next page" + ); + + if !self.buf.is_empty() { + return Ok(true); + } + + let entries = match self + .pager + .as_mut() + .expect("pager must be valid") + .next() + .await? + { + // Ideally, the convert from `Vec` to `VecDeque` will not do reallocation. + // + // However, this could be changed as described in [impl From> for VecDeque](https://doc.rust-lang.org/std/collections/struct.VecDeque.html#impl-From%3CVec%3CT%2C%20A%3E%3E-for-VecDeque%3CT%2C%20A%3E) + Some(entries) => entries.into(), + None => return Ok(false), + }; + // Push fetched entries into buffer. + self.buf = entries; + + Ok(true) + } + /// next_page can be used to fetch a new page. /// /// # Notes