Skip to content

Commit

Permalink
feat(services/supabase): Add read/write/stat support for supabase (#2119
Browse files Browse the repository at this point in the history
)

* add supabase most basic skeleton

Signed-off-by: Ji-Xinyou <jerryji0414@outlook.com>

* minimal buildable skeleton core for supabase

Signed-off-by: Ji-Xinyou <jerryji0414@outlook.com>

* minimal writer

Signed-off-by: Ji-Xinyou <jerryji0414@outlook.com>

* remove pager

Signed-off-by: Ji-Xinyou <jerryji0414@outlook.com>

* minimal read/write functioning supabase

Signed-off-by: Ji-Xinyou <jerryji0414@outlook.com>

* mux the code and implement stat

Signed-off-by: Ji-Xinyou <jerryji0414@outlook.com>

* polish key loading

Signed-off-by: Ji-Xinyou <jerryji0414@outlook.com>

* more docs

Signed-off-by: Ji-Xinyou <jerryji0414@outlook.com>

* uncomment the conditional compilation

Signed-off-by: Ji-Xinyou <jerryji0414@outlook.com>

* fix rebase

Signed-off-by: Ji-Xinyou <jerryji0414@outlook.com>

* comments

Signed-off-by: Ji-Xinyou <jerryji0414@outlook.com>

* service key only

Signed-off-by: Ji-Xinyou <jerryji0414@outlook.com>

* check content length

Signed-off-by: Ji-Xinyou <jerryji0414@outlook.com>

* simplify logics and use key only

Signed-off-by: Ji-Xinyou <jerryji0414@outlook.com>

* minor refactor

Signed-off-by: Ji-Xinyou <jerryji0414@outlook.com>

---------

Signed-off-by: Ji-Xinyou <jerryji0414@outlook.com>
  • Loading branch information
xyjixyjixyji authored Apr 27, 2023
1 parent c6a37ad commit dd290ab
Show file tree
Hide file tree
Showing 10 changed files with 660 additions and 3 deletions.
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ services-s3 = [
"reqsign?/reqwest_request",
]
services-sled = ["dep:sled"]
services-supabase = []
services-wasabi = [
"dep:reqsign",
"reqsign?/services-aws",
Expand Down
5 changes: 5 additions & 0 deletions core/src/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ mod sled;
#[cfg(feature = "services-sled")]
pub use self::sled::Sled;

// #[cfg(feature = "services-supabase")]
mod supabase;
// #[cfg(feature = "services-supabase")]
pub use supabase::Supabase;

#[cfg(feature = "services-wasabi")]
mod wasabi;
#[cfg(feature = "services-wasabi")]
Expand Down
2 changes: 0 additions & 2 deletions core/src/services/s3/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,6 @@ impl S3Core {
self.send(req).await
}

/// Make this functions as `pub(suber)` because `DirStream` depends
/// on this.
pub async fn s3_list_objects(
&self,
path: &str,
Expand Down
274 changes: 274 additions & 0 deletions core/src/services/supabase/backend.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::fmt::Debug;
use std::sync::Arc;

use async_trait::async_trait;
use http::StatusCode;
use log::debug;

use super::core::*;
use super::error::parse_error;
use super::writer::*;
use crate::ops::*;
use crate::raw::*;
use crate::*;

/// Supabase service
///
/// # Capabilities
///
/// - [x] read
/// - [x] write
/// - [ ] copy
/// - [ ] list
/// - [ ] scan
/// - [ ] presign
/// - [ ] blocking
///
/// # Configuration
///
/// - `root`: Set the work dir for backend.
/// - `bucket`: Set the container name for backend.
/// - `endpoint`: Set the endpoint for backend.
/// - `key`: Set the authorization key for the backend, do not set if you want to read public bucket
///
/// ## Authorization keys
///
/// There are two types of key in the Supabase, one is anon_key(Client key), another one is
/// service_role_key(Secret key). The former one can only write public resources while the latter one
/// can access all resources. Note that if you want to read public resources, do not set the key.
///
/// # Example
///
/// ```no_run
/// use anyhow::Result;
/// use opendal::services::Supabase;
/// use opendal::Operator;
///
/// #[tokio::main]
/// async fn main() -> Result<()> {
/// let mut builder = Supabase::default();
/// builder.root("/");
/// builder.bucket("test_bucket");
/// builder.endpoint("http://127.0.0.1:54321");
/// // this sets up the anon_key, which means this operator can only write public resource
/// builder.key("some_anon_key");
///
/// let op: Operator = Operator::new(builder)?.finish();
///
/// Ok(())
/// }
/// ```
#[derive(Default)]
pub struct SupabaseBuilder {
root: Option<String>,

bucket: String,
endpoint: Option<String>,

key: Option<String>,

// todo: optional public, currently true always
// todo: optional file_size_limit, currently 0
// todo: optional allowed_mime_types, currently only string
http_client: Option<HttpClient>,
}

impl Debug for SupabaseBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SupabaseBuilder")
.field("root", &self.root)
.field("bucket", &self.bucket)
.field("endpoint", &self.endpoint)
.finish_non_exhaustive()
}
}

impl SupabaseBuilder {
/// Set root of this backend.
///
/// All operations will happen under this root.
pub fn root(&mut self, root: &str) -> &mut Self {
self.root = if root.is_empty() {
None
} else {
Some(root.to_string())
};

self
}

/// Set bucket name of this backend.
pub fn bucket(&mut self, bucket: &str) -> &mut Self {
self.bucket = bucket.to_string();
self
}

/// Set endpoint of this backend.
///
/// Endpoint must be full uri
pub fn endpoint(&mut self, endpoint: &str) -> &mut Self {
self.endpoint = if endpoint.is_empty() {
None
} else {
Some(endpoint.trim_end_matches('/').to_string())
};

self
}

/// Set the authorization key for this backend
/// Do not set this key if you want to read public bucket
pub fn key(&mut self, key: &str) -> &mut Self {
self.key = Some(key.to_string());
self
}

/// Specify the http client that used by this service.
///
/// # Notes
///
/// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
/// during minor updates.
pub fn http_client(&mut self, client: HttpClient) -> &mut Self {
self.http_client = Some(client);
self
}
}

impl Builder for SupabaseBuilder {
const SCHEME: Scheme = Scheme::Supabase;
type Accessor = SupabaseBackend;

fn from_map(map: std::collections::HashMap<String, String>) -> Self {
let mut builder = SupabaseBuilder::default();

map.get("root").map(|v| builder.root(v));
map.get("bucket").map(|v| builder.bucket(v));
map.get("endpoint").map(|v| builder.endpoint(v));
map.get("key").map(|v| builder.key(v));

builder
}

fn build(&mut self) -> Result<Self::Accessor> {
let root = normalize_root(&self.root.take().unwrap_or_default());
debug!("backend use root {}", &root);

let bucket = &self.bucket;

let endpoint = self.endpoint.take().unwrap_or_default();

let http_client = if let Some(client) = self.http_client.take() {
client
} else {
HttpClient::new().map_err(|err| {
err.with_operation("Builder::build")
.with_context("service", Scheme::Supabase)
})?
};

let key = self.key.as_ref().map(|k| k.to_owned());

let core = SupabaseCore::new(&root, bucket, &endpoint, key, http_client);

let core = Arc::new(core);

Ok(SupabaseBackend { core })
}
}

#[derive(Debug)]
pub struct SupabaseBackend {
core: Arc<SupabaseCore>,
}

#[async_trait]
impl Accessor for SupabaseBackend {
type Reader = IncomingAsyncBody;
type BlockingReader = ();
type Writer = SupabaseWriter;
type BlockingWriter = ();
// todo: implement Pager to support list and scan
type Pager = ();
type BlockingPager = ();

fn info(&self) -> AccessorInfo {
let mut am = AccessorInfo::default();
am.set_scheme(Scheme::Supabase)
.set_root(&self.core.root)
.set_name(&self.core.bucket)
.set_capability(Capability {
stat: true,
read: true,
write: true,

..Default::default()
});

am
}

async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead, Self::Reader)> {
let resp = self.core.supabase_get_object(path).await?;

let status = resp.status();

match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let meta = parse_into_metadata(path, resp.headers())?;
Ok((RpRead::with_metadata(meta), resp.into_body()))
}
_ => Err(parse_error(resp).await?),
}
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
if args.content_length().is_none() {
return Err(Error::new(
ErrorKind::Unsupported,
"write without content length is not supported",
));
}

Ok((
RpWrite::default(),
SupabaseWriter::new(self.core.clone(), path, args),
))
}

async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
// Stat root always returns a DIR.
if path == "/" {
return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
}

let resp = self.core.supabase_get_object_info(path).await?;

let status = resp.status();

match status {
StatusCode::OK => parse_into_metadata(path, resp.headers()).map(RpStat::new),
StatusCode::NOT_FOUND if path.ends_with('/') => {
Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
}
_ => Err(parse_error(resp).await?),
}
}
}
Loading

0 comments on commit dd290ab

Please sign in to comment.