diff --git a/core/Cargo.toml b/core/Cargo.toml index 52330e0b1ef..b8bbd0e989c 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -134,6 +134,7 @@ services-s3 = [ "reqsign?/reqwest_request", ] services-sled = ["dep:sled"] +services-supabase = [] services-wasabi = [ "dep:reqsign", "reqsign?/services-aws", diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index 4c1b7397164..ab0aec0a816 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -119,6 +119,11 @@ mod sled; #[cfg(feature = "services-sled")] pub use self::sled::Sled; +// #[cfg(feature = "services-supabase")] +mod supabase; +// #[cfg(feature = "services-supabase")] +pub use supabase::Supabase; + #[cfg(feature = "services-wasabi")] mod wasabi; #[cfg(feature = "services-wasabi")] diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs index 0081656f181..eba880c07e5 100644 --- a/core/src/services/s3/core.rs +++ b/core/src/services/s3/core.rs @@ -446,8 +446,6 @@ impl S3Core { self.send(req).await } - /// Make this functions as `pub(suber)` because `DirStream` depends - /// on this. pub async fn s3_list_objects( &self, path: &str, diff --git a/core/src/services/supabase/backend.rs b/core/src/services/supabase/backend.rs new file mode 100644 index 00000000000..ae252d0824b --- /dev/null +++ b/core/src/services/supabase/backend.rs @@ -0,0 +1,274 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; +use std::sync::Arc; + +use async_trait::async_trait; +use http::StatusCode; +use log::debug; + +use super::core::*; +use super::error::parse_error; +use super::writer::*; +use crate::ops::*; +use crate::raw::*; +use crate::*; + +/// Supabase service +/// +/// # Capabilities +/// +/// - [x] read +/// - [x] write +/// - [ ] copy +/// - [ ] list +/// - [ ] scan +/// - [ ] presign +/// - [ ] blocking +/// +/// # Configuration +/// +/// - `root`: Set the work dir for backend. +/// - `bucket`: Set the container name for backend. +/// - `endpoint`: Set the endpoint for backend. +/// - `key`: Set the authorization key for the backend, do not set if you want to read public bucket +/// +/// ## Authorization keys +/// +/// There are two types of key in the Supabase, one is anon_key(Client key), another one is +/// service_role_key(Secret key). The former one can only write public resources while the latter one +/// can access all resources. Note that if you want to read public resources, do not set the key. +/// +/// # Example +/// +/// ```no_run +/// use anyhow::Result; +/// use opendal::services::Supabase; +/// use opendal::Operator; +/// +/// #[tokio::main] +/// async fn main() -> Result<()> { +/// let mut builder = Supabase::default(); +/// builder.root("/"); +/// builder.bucket("test_bucket"); +/// builder.endpoint("http://127.0.0.1:54321"); +/// // this sets up the anon_key, which means this operator can only write public resource +/// builder.key("some_anon_key"); +/// +/// let op: Operator = Operator::new(builder)?.finish(); +/// +/// Ok(()) +/// } +/// ``` +#[derive(Default)] +pub struct SupabaseBuilder { + root: Option, + + bucket: String, + endpoint: Option, + + key: Option, + + // todo: optional public, currently true always + // todo: optional file_size_limit, currently 0 + // todo: optional allowed_mime_types, currently only string + http_client: Option, +} + +impl Debug for SupabaseBuilder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SupabaseBuilder") + .field("root", &self.root) + .field("bucket", &self.bucket) + .field("endpoint", &self.endpoint) + .finish_non_exhaustive() + } +} + +impl SupabaseBuilder { + /// Set root of this backend. + /// + /// All operations will happen under this root. + pub fn root(&mut self, root: &str) -> &mut Self { + self.root = if root.is_empty() { + None + } else { + Some(root.to_string()) + }; + + self + } + + /// Set bucket name of this backend. + pub fn bucket(&mut self, bucket: &str) -> &mut Self { + self.bucket = bucket.to_string(); + self + } + + /// Set endpoint of this backend. + /// + /// Endpoint must be full uri + pub fn endpoint(&mut self, endpoint: &str) -> &mut Self { + self.endpoint = if endpoint.is_empty() { + None + } else { + Some(endpoint.trim_end_matches('/').to_string()) + }; + + self + } + + /// Set the authorization key for this backend + /// Do not set this key if you want to read public bucket + pub fn key(&mut self, key: &str) -> &mut Self { + self.key = Some(key.to_string()); + self + } + + /// Specify the http client that used by this service. + /// + /// # Notes + /// + /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed + /// during minor updates. + pub fn http_client(&mut self, client: HttpClient) -> &mut Self { + self.http_client = Some(client); + self + } +} + +impl Builder for SupabaseBuilder { + const SCHEME: Scheme = Scheme::Supabase; + type Accessor = SupabaseBackend; + + fn from_map(map: std::collections::HashMap) -> Self { + let mut builder = SupabaseBuilder::default(); + + map.get("root").map(|v| builder.root(v)); + map.get("bucket").map(|v| builder.bucket(v)); + map.get("endpoint").map(|v| builder.endpoint(v)); + map.get("key").map(|v| builder.key(v)); + + builder + } + + fn build(&mut self) -> Result { + let root = normalize_root(&self.root.take().unwrap_or_default()); + debug!("backend use root {}", &root); + + let bucket = &self.bucket; + + let endpoint = self.endpoint.take().unwrap_or_default(); + + let http_client = if let Some(client) = self.http_client.take() { + client + } else { + HttpClient::new().map_err(|err| { + err.with_operation("Builder::build") + .with_context("service", Scheme::Supabase) + })? + }; + + let key = self.key.as_ref().map(|k| k.to_owned()); + + let core = SupabaseCore::new(&root, bucket, &endpoint, key, http_client); + + let core = Arc::new(core); + + Ok(SupabaseBackend { core }) + } +} + +#[derive(Debug)] +pub struct SupabaseBackend { + core: Arc, +} + +#[async_trait] +impl Accessor for SupabaseBackend { + type Reader = IncomingAsyncBody; + type BlockingReader = (); + type Writer = SupabaseWriter; + type BlockingWriter = (); + // todo: implement Pager to support list and scan + type Pager = (); + type BlockingPager = (); + + fn info(&self) -> AccessorInfo { + let mut am = AccessorInfo::default(); + am.set_scheme(Scheme::Supabase) + .set_root(&self.core.root) + .set_name(&self.core.bucket) + .set_capability(Capability { + stat: true, + read: true, + write: true, + + ..Default::default() + }); + + am + } + + async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead, Self::Reader)> { + let resp = self.core.supabase_get_object(path).await?; + + let status = resp.status(); + + match status { + StatusCode::OK | StatusCode::PARTIAL_CONTENT => { + let meta = parse_into_metadata(path, resp.headers())?; + Ok((RpRead::with_metadata(meta), resp.into_body())) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + if args.content_length().is_none() { + return Err(Error::new( + ErrorKind::Unsupported, + "write without content length is not supported", + )); + } + + Ok(( + RpWrite::default(), + SupabaseWriter::new(self.core.clone(), path, args), + )) + } + + async fn stat(&self, path: &str, _args: OpStat) -> Result { + // Stat root always returns a DIR. + if path == "/" { + return Ok(RpStat::new(Metadata::new(EntryMode::DIR))); + } + + let resp = self.core.supabase_get_object_info(path).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => parse_into_metadata(path, resp.headers()).map(RpStat::new), + StatusCode::NOT_FOUND if path.ends_with('/') => { + Ok(RpStat::new(Metadata::new(EntryMode::DIR))) + } + _ => Err(parse_error(resp).await?), + } + } +} diff --git a/core/src/services/supabase/core.rs b/core/src/services/supabase/core.rs new file mode 100644 index 00000000000..38391427b0d --- /dev/null +++ b/core/src/services/supabase/core.rs @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; + +use http::header::CONTENT_LENGTH; +use http::header::CONTENT_TYPE; +use http::HeaderValue; +use http::Request; +use http::Response; + +use crate::raw::*; +use crate::*; + +pub struct SupabaseCore { + pub root: String, + pub bucket: String, + pub endpoint: String, + + /// The key used for authorization + /// If loaded, the read operation will always access the nonpublic resources. + /// If you want to read the public resources, please do not set the key. + pub key: Option, + + pub http_client: HttpClient, +} + +impl Debug for SupabaseCore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SupabaseCore") + .field("root", &self.root) + .field("bucket", &self.bucket) + .field("endpoint", &self.endpoint) + .finish_non_exhaustive() + } +} + +impl SupabaseCore { + pub fn new( + root: &str, + bucket: &str, + endpoint: &str, + key: Option, + client: HttpClient, + ) -> Self { + Self { + root: root.to_string(), + bucket: bucket.to_string(), + endpoint: endpoint.to_string(), + key, + http_client: client, + } + } + + /// Add authorization header to the request if the key is set. Otherwise leave + /// the request as-is. + pub fn sign(&self, req: &mut Request) -> Result<()> { + if let Some(k) = &self.key { + let v = HeaderValue::from_str(&format!("Bearer {}", k)).unwrap(); + req.headers_mut().insert(http::header::AUTHORIZATION, v); + } + Ok(()) + } +} + +// requests +impl SupabaseCore { + pub fn supabase_upload_object_request( + &self, + path: &str, + size: Option, + content_type: Option<&str>, + body: AsyncBody, + ) -> Result> { + let p = build_abs_path(&self.root, path); + let url = format!( + "{}/storage/v1/object/{}/{}", + self.endpoint, + self.bucket, + percent_encode_path(&p) + ); + + let mut req = Request::post(&url); + + if let Some(size) = size { + req = req.header(CONTENT_LENGTH, size) + } + + if let Some(mime) = content_type { + req = req.header(CONTENT_TYPE, mime) + } + + let req = req.body(body).map_err(new_request_build_error)?; + + Ok(req) + } + + pub fn supabase_get_object_public_request(&self, path: &str) -> Result> { + let p = build_abs_path(&self.root, path); + let url = format!( + "{}/storage/v1/object/public/{}/{}", + self.endpoint, + self.bucket, + percent_encode_path(&p) + ); + + Request::get(&url) + .body(AsyncBody::Empty) + .map_err(new_request_build_error) + } + + pub fn supabase_get_object_auth_request(&self, path: &str) -> Result> { + let p = build_abs_path(&self.root, path); + let url = format!( + "{}/storage/v1/object/authenticated/{}/{}", + self.endpoint, + self.bucket, + percent_encode_path(&p) + ); + + Request::get(&url) + .body(AsyncBody::Empty) + .map_err(new_request_build_error) + } + + pub fn supabase_get_object_info_public_request( + &self, + path: &str, + ) -> Result> { + let p = build_abs_path(&self.root, path); + let url = format!( + "{}/storage/v1/object/info/public/{}/{}", + self.endpoint, + self.bucket, + percent_encode_path(&p) + ); + + Request::get(&url) + .body(AsyncBody::Empty) + .map_err(new_request_build_error) + } + + pub fn supabase_get_object_info_auth_request(&self, path: &str) -> Result> { + let p = build_abs_path(&self.root, path); + let url = format!( + "{}/storage/v1/object/info/authenticated/{}/{}", + self.endpoint, + self.bucket, + percent_encode_path(&p) + ); + + Request::get(&url) + .body(AsyncBody::Empty) + .map_err(new_request_build_error) + } +} + +// core utils +impl SupabaseCore { + pub async fn send(&self, req: Request) -> Result> { + self.http_client.send(req).await + } + + pub async fn supabase_get_object(&self, path: &str) -> Result> { + let mut req = if self.key.is_some() { + self.supabase_get_object_auth_request(path)? + } else { + self.supabase_get_object_public_request(path)? + }; + self.sign(&mut req)?; + self.send(req).await + } + + pub async fn supabase_get_object_info( + &self, + path: &str, + ) -> Result> { + let mut req = if self.key.is_some() { + self.supabase_get_object_info_auth_request(path)? + } else { + self.supabase_get_object_info_public_request(path)? + }; + self.sign(&mut req)?; + self.send(req).await + } +} diff --git a/core/src/services/supabase/error.rs b/core/src/services/supabase/error.rs new file mode 100644 index 00000000000..150178714cb --- /dev/null +++ b/core/src/services/supabase/error.rs @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use http::Response; +use http::StatusCode; +use serde::Deserialize; +use serde_json::from_slice; + +use crate::{raw::*, Error, ErrorKind, Result}; + +#[derive(Default, Debug, Deserialize)] +#[serde(default, rename_all = "camelCase")] +/// The error returned by Supabase +struct SupabaseError { + status_code: String, + error: String, + message: String, +} + +/// Parse the supabase error type to the OpenDAL error type +pub async fn parse_error(resp: Response) -> Result { + let (parts, body) = resp.into_parts(); + let bs = body.bytes().await?; + + // todo: the supabase error has status code 4XX, handle all that + let (kind, retryable) = match parts.status { + StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), + StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false), + StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED => { + (ErrorKind::ConditionNotMatch, false) + } + StatusCode::INTERNAL_SERVER_ERROR + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true), + _ => (ErrorKind::Unexpected, false), + }; + + let (message, _) = from_slice::(&bs) + .map(|sb_err| (format!("{sb_err:?}"), Some(sb_err))) + .unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None)); + + let mut err = Error::new(kind, &message).with_context("response", format!("{parts:?}")); + + if retryable { + err = err.set_temporary(); + } + + Ok(err) +} diff --git a/core/src/services/supabase/mod.rs b/core/src/services/supabase/mod.rs new file mode 100644 index 00000000000..89aeada8898 --- /dev/null +++ b/core/src/services/supabase/mod.rs @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod backend; +pub use backend::SupabaseBuilder as Supabase; +mod core; +mod error; +mod writer; diff --git a/core/src/services/supabase/writer.rs b/core/src/services/supabase/writer.rs new file mode 100644 index 00000000000..5d32a4124b8 --- /dev/null +++ b/core/src/services/supabase/writer.rs @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use http::StatusCode; + +use super::core::*; +use super::error::parse_error; +use crate::ops::OpWrite; +use crate::raw::*; +use crate::*; + +pub struct SupabaseWriter { + core: Arc, + + op: OpWrite, + path: String, +} + +impl SupabaseWriter { + pub fn new(core: Arc, path: &str, op: OpWrite) -> Self { + SupabaseWriter { + core, + op, + path: path.to_string(), + } + } + + pub async fn upload(&self, bytes: Bytes) -> Result<()> { + let size = bytes.len(); + let mut req = self.core.supabase_upload_object_request( + &self.path, + Some(size), + self.op.content_type(), + AsyncBody::Bytes(bytes), + )?; + + self.core.sign(&mut req)?; + + let resp = self.core.send(req).await?; + + match resp.status() { + StatusCode::OK => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } +} + +#[async_trait] +impl oio::Write for SupabaseWriter { + async fn write(&mut self, bs: Bytes) -> Result<()> { + if bs.is_empty() { + return Ok(()); + } + + self.upload(bs).await + } + + async fn abort(&mut self) -> Result<()> { + Err(Error::new( + ErrorKind::Unsupported, + "The abort operation is not yet supported for Supabase backend", + )) + } + + async fn close(&mut self) -> Result<()> { + Ok(()) + } +} diff --git a/core/src/services/wasabi/backend.rs b/core/src/services/wasabi/backend.rs index 5190bb2133f..5560fe9fe84 100644 --- a/core/src/services/wasabi/backend.rs +++ b/core/src/services/wasabi/backend.rs @@ -1062,7 +1062,7 @@ impl Accessor for WasabiBackend { if ops.len() > 1000 { return Err(Error::new( ErrorKind::Unsupported, - "s3 services only allow delete up to 1000 keys at once", + "wasabi services only allow delete up to 1000 keys at once", ) .with_context("length", ops.len().to_string())); } diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index aff72868289..3673b172b0e 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -71,6 +71,8 @@ pub enum Scheme { S3, /// [sled][crate::services::Sled]: Sled services Sled, + /// [Supabase][crate::services::Supabase]: Supabase storage service + Supabase, /// [wasabi][crate::services::Wasabi]: Wasabi service Wasabi, /// [webdav][crate::services::Webdav]: WebDAV support. @@ -130,6 +132,7 @@ impl FromStr for Scheme { "rocksdb" => Ok(Scheme::Rocksdb), "s3" => Ok(Scheme::S3), "sled" => Ok(Scheme::Sled), + "supabase" => Ok(Scheme::Supabase), "oss" => Ok(Scheme::Oss), "wasabi" => Ok(Scheme::Wasabi), "webdav" => Ok(Scheme::Webdav), @@ -161,6 +164,7 @@ impl From for &'static str { Scheme::Rocksdb => "rocksdb", Scheme::S3 => "s3", Scheme::Sled => "sled", + Scheme::Supabase => "supabase", Scheme::Oss => "oss", Scheme::Wasabi => "wasabi", Scheme::Webdav => "webdav",