Skip to content

Commit

Permalink
refactor: Implement RFC-3911 Deleter API (#5392)
Browse files Browse the repository at this point in the history
  • Loading branch information
Xuanwo authored Dec 5, 2024
1 parent f3bf1d4 commit c77a07a
Show file tree
Hide file tree
Showing 176 changed files with 5,013 additions and 1,898 deletions.
2 changes: 1 addition & 1 deletion core/src/docs/rfcs/3911_deleter_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ And the `delete` API will be changed to return a `oio::Delete` instead:

```diff
trait Accessor {
- async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete>;
- async fn delete(&self) -> Result<(RpDelete, Self::Deleter)>;
+ async fn delete(&self, args: OpDelete) -> Result<(RpDelete, Self::Deleter)>;
}
```
Expand Down
49 changes: 38 additions & 11 deletions core/src/layers/async_backtrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ impl<A: Access> LayeredAccess for AsyncBacktraceAccessor<A> {
type BlockingWriter = AsyncBacktraceWrapper<A::BlockingWriter>;
type Lister = AsyncBacktraceWrapper<A::Lister>;
type BlockingLister = AsyncBacktraceWrapper<A::BlockingLister>;
type Deleter = AsyncBacktraceWrapper<A::Deleter>;
type BlockingDeleter = AsyncBacktraceWrapper<A::BlockingDeleter>;

fn inner(&self) -> &Self::Inner {
&self.inner
Expand Down Expand Up @@ -102,8 +104,11 @@ impl<A: Access> LayeredAccess for AsyncBacktraceAccessor<A> {
}

#[async_backtrace::framed]
async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
self.inner.delete(path, args).await
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
self.inner
.delete()
.await
.map(|(rp, r)| (rp, AsyncBacktraceWrapper::new(r)))
}

#[async_backtrace::framed]
Expand All @@ -114,11 +119,6 @@ impl<A: Access> LayeredAccess for AsyncBacktraceAccessor<A> {
.map(|(rp, r)| (rp, AsyncBacktraceWrapper::new(r)))
}

#[async_backtrace::framed]
async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
self.inner.batch(args).await
}

#[async_backtrace::framed]
async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
self.inner.presign(path, args).await
Expand All @@ -141,6 +141,12 @@ impl<A: Access> LayeredAccess for AsyncBacktraceAccessor<A> {
.blocking_list(path, args)
.map(|(rp, r)| (rp, AsyncBacktraceWrapper::new(r)))
}

fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
self.inner
.blocking_delete()
.map(|(rp, r)| (rp, AsyncBacktraceWrapper::new(r)))
}
}

pub struct AsyncBacktraceWrapper<R> {
Expand Down Expand Up @@ -173,13 +179,13 @@ impl<R: oio::Write> oio::Write for AsyncBacktraceWrapper<R> {
}

#[async_backtrace::framed]
async fn abort(&mut self) -> Result<()> {
self.inner.abort().await
async fn close(&mut self) -> Result<()> {
self.inner.close().await
}

#[async_backtrace::framed]
async fn close(&mut self) -> Result<()> {
self.inner.close().await
async fn abort(&mut self) -> Result<()> {
self.inner.abort().await
}
}

Expand All @@ -205,3 +211,24 @@ impl<R: oio::BlockingList> oio::BlockingList for AsyncBacktraceWrapper<R> {
self.inner.next()
}
}

impl<R: oio::Delete> oio::Delete for AsyncBacktraceWrapper<R> {
fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
self.inner.delete(path, args)
}

#[async_backtrace::framed]
async fn flush(&mut self) -> Result<usize> {
self.inner.flush().await
}
}

impl<R: oio::BlockingDelete> oio::BlockingDelete for AsyncBacktraceWrapper<R> {
fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
self.inner.delete(path, args)
}

fn flush(&mut self) -> Result<usize> {
self.inner.flush()
}
}
43 changes: 34 additions & 9 deletions core/src/layers/await_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ impl<A: Access> LayeredAccess for AwaitTreeAccessor<A> {
type BlockingWriter = AwaitTreeWrapper<A::BlockingWriter>;
type Lister = AwaitTreeWrapper<A::Lister>;
type BlockingLister = AwaitTreeWrapper<A::BlockingLister>;
type Deleter = AwaitTreeWrapper<A::Deleter>;
type BlockingDeleter = AwaitTreeWrapper<A::BlockingDeleter>;

fn inner(&self) -> &Self::Inner {
&self.inner
Expand Down Expand Up @@ -118,11 +120,12 @@ impl<A: Access> LayeredAccess for AwaitTreeAccessor<A> {
.await
}

async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
self.inner
.delete(path, args)
.delete()
.instrument_await(format!("opendal::{}", Operation::Delete))
.await
.map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
Expand All @@ -140,13 +143,6 @@ impl<A: Access> LayeredAccess for AwaitTreeAccessor<A> {
.await
}

async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
self.inner
.batch(args)
.instrument_await(format!("opendal::{}", Operation::Batch))
.await
}

fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
self.inner
.blocking_read(path, args)
Expand All @@ -164,6 +160,12 @@ impl<A: Access> LayeredAccess for AwaitTreeAccessor<A> {
.blocking_list(path, args)
.map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
}

fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
self.inner
.blocking_delete()
.map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
}
}

pub struct AwaitTreeWrapper<R> {
Expand Down Expand Up @@ -235,3 +237,26 @@ impl<R: oio::BlockingList> oio::BlockingList for AwaitTreeWrapper<R> {
self.inner.next()
}
}

impl<R: oio::Delete> oio::Delete for AwaitTreeWrapper<R> {
fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
self.inner.delete(path, args)
}

async fn flush(&mut self) -> Result<usize> {
self.inner
.flush()
.instrument_await(format!("opendal::{}", Operation::DeleterFlush))
.await
}
}

impl<R: oio::BlockingDelete> oio::BlockingDelete for AwaitTreeWrapper<R> {
fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
self.inner.delete(path, args)
}

fn flush(&mut self) -> Result<usize> {
self.inner.flush()
}
}
28 changes: 20 additions & 8 deletions core/src/layers/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ impl<A: Access> LayeredAccess for BlockingAccessor<A> {
type BlockingWriter = BlockingWrapper<A::Writer>;
type Lister = A::Lister;
type BlockingLister = BlockingWrapper<A::Lister>;
type Deleter = A::Deleter;
type BlockingDeleter = BlockingWrapper<A::Deleter>;

fn inner(&self) -> &Self::Inner {
&self.inner
Expand Down Expand Up @@ -203,8 +205,8 @@ impl<A: Access> LayeredAccess for BlockingAccessor<A> {
self.inner.stat(path, args).await
}

async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
self.inner.delete(path, args).await
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
self.inner.delete().await
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
Expand All @@ -215,10 +217,6 @@ impl<A: Access> LayeredAccess for BlockingAccessor<A> {
self.inner.presign(path, args).await
}

async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
self.inner.batch(args).await
}

fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
self.handle.block_on(self.inner.create_dir(path, args))
}
Expand Down Expand Up @@ -252,8 +250,12 @@ impl<A: Access> LayeredAccess for BlockingAccessor<A> {
self.handle.block_on(self.inner.stat(path, args))
}

fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
self.handle.block_on(self.inner.delete(path, args))
fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
self.handle.block_on(async {
let (rp, writer) = self.inner.delete().await?;
let blocking_deleter = Self::BlockingDeleter::new(self.handle.clone(), writer);
Ok((rp, blocking_deleter))
})
}

fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
Expand Down Expand Up @@ -298,6 +300,16 @@ impl<I: oio::List> oio::BlockingList for BlockingWrapper<I> {
}
}

impl<I: oio::Delete + 'static> oio::BlockingDelete for BlockingWrapper<I> {
fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
self.inner.delete(path, args)
}

fn flush(&mut self) -> Result<usize> {
self.handle.block_on(self.inner.flush())
}
}

#[cfg(test)]
mod tests {
use once_cell::sync::Lazy;
Expand Down
16 changes: 14 additions & 2 deletions core/src/layers/capability_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,13 @@ impl<A: Access> Debug for CapabilityAccessor<A> {
impl<A: Access> LayeredAccess for CapabilityAccessor<A> {
type Inner = A;
type Reader = A::Reader;
type BlockingReader = A::BlockingReader;
type Writer = A::Writer;
type BlockingWriter = A::BlockingWriter;
type Lister = A::Lister;
type Deleter = A::Deleter;
type BlockingReader = A::BlockingReader;
type BlockingWriter = A::BlockingWriter;
type BlockingLister = A::BlockingLister;
type BlockingDeleter = A::BlockingDeleter;

fn inner(&self) -> &Self::Inner {
&self.inner
Expand Down Expand Up @@ -123,6 +125,10 @@ impl<A: Access> LayeredAccess for CapabilityAccessor<A> {
self.inner.write(path, args).await
}

async fn delete(&self) -> crate::Result<(RpDelete, Self::Deleter)> {
self.inner.delete().await
}

async fn list(&self, path: &str, args: OpList) -> crate::Result<(RpList, Self::Lister)> {
let capability = self.info.full_capability();
if !capability.list_with_version && args.version() {
Expand Down Expand Up @@ -175,6 +181,10 @@ impl<A: Access> LayeredAccess for CapabilityAccessor<A> {
self.inner.blocking_write(path, args)
}

fn blocking_delete(&self) -> crate::Result<(RpDelete, Self::BlockingDeleter)> {
self.inner.blocking_delete()
}

fn blocking_list(
&self,
path: &str,
Expand Down Expand Up @@ -207,9 +217,11 @@ mod tests {
type Reader = oio::Reader;
type Writer = oio::Writer;
type Lister = oio::Lister;
type Deleter = oio::Deleter;
type BlockingReader = oio::BlockingReader;
type BlockingWriter = oio::BlockingWriter;
type BlockingLister = oio::BlockingLister;
type BlockingDeleter = oio::BlockingDeleter;

fn info(&self) -> Arc<AccessorInfo> {
let mut info = AccessorInfo::default();
Expand Down
10 changes: 10 additions & 0 deletions core/src/layers/chaos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ impl<A: Access> LayeredAccess for ChaosAccessor<A> {
type BlockingWriter = A::BlockingWriter;
type Lister = A::Lister;
type BlockingLister = A::BlockingLister;
type Deleter = A::Deleter;
type BlockingDeleter = A::BlockingDeleter;

fn inner(&self) -> &Self::Inner {
&self.inner
Expand Down Expand Up @@ -138,6 +140,14 @@ impl<A: Access> LayeredAccess for ChaosAccessor<A> {
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
self.inner.blocking_list(path, args)
}

async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
self.inner.delete().await
}

fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
self.inner.blocking_delete()
}
}

/// ChaosReader will inject error into read operations.
Expand Down
14 changes: 14 additions & 0 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ impl<A: Access> LayeredAccess for CompleteAccessor<A> {
type BlockingWriter = CompleteWriter<A::BlockingWriter>;
type Lister = CompleteLister<A, A::Lister>;
type BlockingLister = CompleteLister<A, A::BlockingLister>;
type Deleter = A::Deleter;
type BlockingDeleter = A::BlockingDeleter;

fn inner(&self) -> &Self::Inner {
&self.inner
Expand Down Expand Up @@ -373,10 +375,18 @@ impl<A: Access> LayeredAccess for CompleteAccessor<A> {
self.complete_stat(path, args).await
}

async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
self.inner().delete().await
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
self.complete_list(path, args).await
}

async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
self.inner.presign(path, args).await
}

fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
self.complete_blocking_create_dir(path, args)
}
Expand All @@ -398,6 +408,10 @@ impl<A: Access> LayeredAccess for CompleteAccessor<A> {
self.complete_blocking_stat(path, args)
}

fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
self.inner().blocking_delete()
}

fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
self.complete_blocking_list(path, args)
}
Expand Down
Loading

0 comments on commit c77a07a

Please sign in to comment.