Skip to content

Commit

Permalink
refactor: Use list without delimiter to replace scan (#2243)
Browse files Browse the repository at this point in the history
* Fix naming

Signed-off-by: Xuanwo <github@xuanwo.io>

* Remove scan

Signed-off-by: Xuanwo <github@xuanwo.io>

* Fix test

Signed-off-by: Xuanwo <github@xuanwo.io>

* Fix test

Signed-off-by: Xuanwo <github@xuanwo.io>

* Fix webhdfs

Signed-off-by: Xuanwo <github@xuanwo.io>

* Fix unit tests

Signed-off-by: Xuanwo <github@xuanwo.io>

* FIx test

Signed-off-by: Xuanwo <github@xuanwo.io>

---------

Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo authored May 8, 2023
1 parent 60bcbf7 commit 4246bfd
Show file tree
Hide file tree
Showing 44 changed files with 299 additions and 721 deletions.
8 changes: 0 additions & 8 deletions core/src/layers/chaos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,9 @@ impl<A: Accessor> LayeredAccessor for ChaosAccessor<A> {
self.inner.list(path, args).await
}

async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> {
self.inner.scan(path, args).await
}

fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> {
self.inner.blocking_list(path, args)
}

fn blocking_scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::BlockingPager)> {
self.inner.blocking_scan(path, args)
}
}

/// ChaosReader will inject error into read operations.
Expand Down
149 changes: 73 additions & 76 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,93 +225,98 @@ impl<A: Accessor> CompleteReaderAccessor<A> {
path: &str,
args: OpList,
) -> Result<(RpList, CompletePager<A, A::Pager>)> {
let capability = self.meta.capability();
let (can_list, can_scan) = (capability.list, capability.scan);

if can_list {
let (rp, p) = self.inner.list(path, args).await?;
Ok((rp, CompletePager::AlreadyComplete(p)))
} else if can_scan {
let (_, p) = self.inner.scan(path, OpScan::new()).await?;
let p = to_hierarchy_pager(p, path);
Ok((RpList::default(), CompletePager::NeedHierarchy(p)))
} else {
Err(
let cap = self.meta.capability();
if !cap.list {
return Err(
Error::new(ErrorKind::Unsupported, "operation is not supported")
.with_context("service", self.meta.scheme())
.with_operation("list"),
)
);
}

let delimiter = args.delimiter();

if delimiter.is_empty() {
return if cap.list_without_delimiter {
let (rp, p) = self.inner.list(path, args).await?;
Ok((rp, CompletePager::AlreadyComplete(p)))
} else {
let p = to_flat_pager(
self.inner.clone(),
path,
args.with_delimiter("/").limit().unwrap_or(1000),
);
Ok((RpList::default(), CompletePager::NeedFlat(p)))
};
}

if delimiter == "/" {
return if cap.list_with_delimiter_slash {
let (rp, p) = self.inner.list(path, args).await?;
Ok((rp, CompletePager::AlreadyComplete(p)))
} else {
let (_, p) = self.inner.list(path, args.with_delimiter("")).await?;
let p = to_hierarchy_pager(p, path);
Ok((RpList::default(), CompletePager::NeedHierarchy(p)))
};
}

Err(Error::new(
ErrorKind::Unsupported,
"list with other delimiter is not supported",
)
.with_context("service", self.meta.scheme())
.with_context("delimiter", delimiter))
}

fn complete_blocking_list(
&self,
path: &str,
args: OpList,
) -> Result<(RpList, CompletePager<A, A::BlockingPager>)> {
let capability = self.meta.capability();
let (can_list, can_scan) = (capability.list, capability.scan);

if can_list {
let (rp, p) = self.inner.blocking_list(path, args)?;
Ok((rp, CompletePager::AlreadyComplete(p)))
} else if can_scan {
let (_, p) = self.inner.blocking_scan(path, OpScan::new())?;
let p = to_hierarchy_pager(p, path);
Ok((RpList::default(), CompletePager::NeedHierarchy(p)))
} else {
Err(
let cap = self.meta.capability();
if !cap.list {
return Err(
Error::new(ErrorKind::Unsupported, "operation is not supported")
.with_context("service", self.meta.scheme())
.with_operation("list"),
)
);
}
}

async fn complete_scan(
&self,
path: &str,
args: OpScan,
) -> Result<(RpScan, CompletePager<A, A::Pager>)> {
let capability = self.meta.capability();
let (can_list, can_scan) = (capability.list, capability.scan);

if can_scan {
let (rp, p) = self.inner.scan(path, args).await?;
Ok((rp, CompletePager::AlreadyComplete(p)))
} else if can_list {
let p = to_flat_pager(self.inner.clone(), path, args.limit().unwrap_or(1000));
Ok((RpScan::default(), CompletePager::NeedFlat(p)))
} else {
Err(
Error::new(ErrorKind::Unsupported, "operation is not supported")
.with_context("service", self.meta.scheme())
.with_operation("scan"),
)
let delimiter = args.delimiter();

if delimiter.is_empty() {
return if cap.list_without_delimiter {
let (rp, p) = self.inner.blocking_list(path, args)?;
Ok((rp, CompletePager::AlreadyComplete(p)))
} else {
let p = to_flat_pager(
self.inner.clone(),
path,
args.with_delimiter("/").limit().unwrap_or(1000),
);
Ok((RpList::default(), CompletePager::NeedFlat(p)))
};
}
}

fn complete_blocking_scan(
&self,
path: &str,
args: OpScan,
) -> Result<(RpScan, CompletePager<A, A::BlockingPager>)> {
let capability = self.meta.capability();
let (can_list, can_scan) = (capability.list, capability.scan);

if can_scan {
let (rp, p) = self.inner.blocking_scan(path, args)?;
Ok((rp, CompletePager::AlreadyComplete(p)))
} else if can_list {
let p = to_flat_pager(self.inner.clone(), path, args.limit().unwrap_or(1000));
Ok((RpScan::default(), CompletePager::NeedFlat(p)))
} else {
Err(
Error::new(ErrorKind::Unsupported, "operation is not supported")
.with_context("service", self.meta.scheme())
.with_operation("scan"),
)
if delimiter == "/" {
return if cap.list_with_delimiter_slash {
let (rp, p) = self.inner.blocking_list(path, args)?;
Ok((rp, CompletePager::AlreadyComplete(p)))
} else {
let (_, p) = self.inner.blocking_list(path, args.with_delimiter(""))?;
let p: ToHierarchyPager<<A as Accessor>::BlockingPager> =
to_hierarchy_pager(p, path);
Ok((RpList::default(), CompletePager::NeedHierarchy(p)))
};
}

Err(Error::new(
ErrorKind::Unsupported,
"list with other delimiter is not supported",
)
.with_context("service", self.meta.scheme())
.with_context("delimiter", delimiter))
}
}

Expand Down Expand Up @@ -377,14 +382,6 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> {
self.complete_blocking_list(path, args)
}

async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> {
self.complete_scan(path, args).await
}

fn blocking_scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::BlockingPager)> {
self.complete_blocking_scan(path, args)
}
}

pub enum CompleteReader<A: Accessor, R> {
Expand Down
30 changes: 2 additions & 28 deletions core/src/layers/concurrent_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl<A: Accessor> LayeredAccessor for ConcurrentLimitAccessor<A> {
&self.inner
}

async fn create_dir(&self, path: &str, args: OpCreate) -> Result<RpCreate> {
async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
let _permit = self
.semaphore
.acquire()
Expand Down Expand Up @@ -166,20 +166,6 @@ impl<A: Accessor> LayeredAccessor for ConcurrentLimitAccessor<A> {
.map(|(rp, s)| (rp, ConcurrentLimitWrapper::new(s, permit)))
}

async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> {
let permit = self
.semaphore
.clone()
.acquire_owned()
.await
.expect("semaphore must be valid");

self.inner
.scan(path, args)
.await
.map(|(rp, s)| (rp, ConcurrentLimitWrapper::new(s, permit)))
}

async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
let _permit = self
.semaphore
Expand All @@ -190,7 +176,7 @@ impl<A: Accessor> LayeredAccessor for ConcurrentLimitAccessor<A> {
self.inner.batch(args).await
}

fn blocking_create_dir(&self, path: &str, args: OpCreate) -> Result<RpCreate> {
fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
let _permit = self
.semaphore
.try_acquire()
Expand Down Expand Up @@ -252,18 +238,6 @@ impl<A: Accessor> LayeredAccessor for ConcurrentLimitAccessor<A> {
.blocking_list(path, args)
.map(|(rp, it)| (rp, ConcurrentLimitWrapper::new(it, permit)))
}

fn blocking_scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::BlockingPager)> {
let permit = self
.semaphore
.clone()
.try_acquire_owned()
.expect("semaphore must be valid");

self.inner
.blocking_scan(path, args)
.map(|(rp, it)| (rp, ConcurrentLimitWrapper::new(it, permit)))
}
}

pub struct ConcurrentLimitWrapper<R> {
Expand Down
45 changes: 2 additions & 43 deletions core/src/layers/error_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl<A: Accessor> LayeredAccessor for ErrorContextAccessor<A> {
self.meta.clone()
}

async fn create_dir(&self, path: &str, args: OpCreate) -> Result<RpCreate> {
async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
self.inner
.create_dir(path, args)
.map_err(|err| {
Expand Down Expand Up @@ -205,27 +205,6 @@ impl<A: Accessor> LayeredAccessor for ErrorContextAccessor<A> {
.await
}

async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> {
self.inner
.scan(path, args)
.map_ok(|(rp, os)| {
(
rp,
ErrorContextWrapper {
scheme: self.meta.scheme(),
path: path.to_string(),
inner: os,
},
)
})
.map_err(|err| {
err.with_operation(Operation::Scan)
.with_context("service", self.meta.scheme())
.with_context("path", path)
})
.await
}

async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
self.inner.presign(path, args).await.map_err(|err| {
err.with_operation(Operation::Presign)
Expand Down Expand Up @@ -260,7 +239,7 @@ impl<A: Accessor> LayeredAccessor for ErrorContextAccessor<A> {
.await
}

fn blocking_create_dir(&self, path: &str, args: OpCreate) -> Result<RpCreate> {
fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
self.inner.blocking_create_dir(path, args).map_err(|err| {
err.with_operation(Operation::BlockingCreateDir)
.with_context("service", self.meta.scheme())
Expand Down Expand Up @@ -361,26 +340,6 @@ impl<A: Accessor> LayeredAccessor for ErrorContextAccessor<A> {
.with_context("path", path)
})
}

fn blocking_scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::BlockingPager)> {
self.inner
.blocking_scan(path, args)
.map(|(rp, os)| {
(
rp,
ErrorContextWrapper {
scheme: self.meta.scheme(),
path: path.to_string(),
inner: os,
},
)
})
.map_err(|err| {
err.with_operation(Operation::BlockingScan)
.with_context("service", self.meta.scheme())
.with_context("path", path)
})
}
}

pub struct ErrorContextWrapper<T> {
Expand Down
Loading

0 comments on commit 4246bfd

Please sign in to comment.