Skip to content

Commit

Permalink
feat(core): basic copy and blocking copy
Browse files Browse the repository at this point in the history
Signed-off-by: suyanhanx <suyanhanx@gmail.com>
  • Loading branch information
suyanhanx committed Apr 3, 2023
1 parent 5787e99 commit b5394b6
Show file tree
Hide file tree
Showing 9 changed files with 293 additions and 0 deletions.
81 changes: 81 additions & 0 deletions core/src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,47 @@ impl<A: Accessor> LayeredAccessor for LoggingAccessor<A> {
})
}

async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
debug!(
target: LOGGING_TARGET,
"service={} operation={} from={} to={} -> started",
self.scheme,
Operation::Copy,
from,
to
);

self.inner
.copy(from, to, args)
.await
.map(|v| {
debug!(
target: LOGGING_TARGET,
"service={} operation={} from={} to={} -> finished",
self.scheme,
Operation::Copy,
from,
to
);
v
})
.map_err(|err| {
if let Some(lvl) = self.err_level(&err) {
log!(
target: LOGGING_TARGET,
lvl,
"service={} operation={} from={} to={} -> {}: {err:?}",
self.scheme,
Operation::Copy,
from,
to,
self.err_status(&err)
)
};
err
})
}

async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
debug!(
target: LOGGING_TARGET,
Expand Down Expand Up @@ -699,6 +740,46 @@ impl<A: Accessor> LayeredAccessor for LoggingAccessor<A> {
})
}

fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
debug!(
target: LOGGING_TARGET,
"service={} operation={} from={} to={} -> started",
self.scheme,
Operation::BlockingCopy,
from,
to,
);

self.inner
.blocking_copy(from, to, args)
.map(|v| {
debug!(
target: LOGGING_TARGET,
"service={} operation={} from={} to={} -> finished",
self.scheme,
Operation::BlockingCopy,
from,
to,
);
v
})
.map_err(|err| {
if let Some(lvl) = self.err_level(&err) {
log!(
target: LOGGING_TARGET,
lvl,
"service={} operation={} from={} to={} -> {}: {err:?}",
self.scheme,
Operation::BlockingCopy,
from,
to,
self.err_status(&err)
);
}
err
})
}

fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
debug!(
target: LOGGING_TARGET,
Expand Down
42 changes: 42 additions & 0 deletions core/src/raw/accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,24 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static {
))
}

/// Invoke the `copy` operation on the specified `from` path and `to` path.
///
/// Require [AccessorCapability::Read] and [AccessorCapability::Write]
///
/// # Behaviour
///
/// - `from` and `to` MUST be file path, DON'T NEED to check mode.
/// - Copy on existing file SHOULD succeed.
/// - Copy on existing file SHOULD overwrite and truncate.
async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
let (_, _, _) = (from, to, args);

Err(Error::new(
ErrorKind::Unsupported,
"operation is not supported",
))
}

/// Invoke the `stat` operation on the specified path.
///
/// Require [`AccessorCapability::Read`]
Expand Down Expand Up @@ -263,6 +281,20 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static {
))
}

/// Invoke the `blocking_copy` operation on the specified `from` path and `to` path.
///
/// This operation is the blocking version of [`Accessor::copy`]
///
/// Require [`AccessorCapability::Read`], [`AccessorCapability::Write`] and [`AccessorCapability::Blocking`]
fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
let (_, _, _) = (from, to, args);

Err(Error::new(
ErrorKind::Unsupported,
"operation is not supported",
))
}

/// Invoke the `blocking_stat` operation on the specified path.
///
/// This operation is the blocking version of [`Accessor::stat`]
Expand Down Expand Up @@ -369,6 +401,11 @@ impl<T: Accessor + ?Sized> Accessor for Arc<T> {
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
self.as_ref().write(path, args).await
}

async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
self.as_ref().copy(from, to, args).await
}

async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
self.as_ref().stat(path, args).await
}
Expand Down Expand Up @@ -399,6 +436,11 @@ impl<T: Accessor + ?Sized> Accessor for Arc<T> {
fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
self.as_ref().blocking_write(path, args)
}

fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
self.as_ref().blocking_copy(from, to, args)
}

fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
self.as_ref().blocking_stat(path, args)
}
Expand Down
16 changes: 16 additions & 0 deletions core/src/raw/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ pub trait LayeredAccessor: Send + Sync + Debug + Unpin + 'static {

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)>;

async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
self.inner().copy(from, to, args).await
}

async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
self.inner().stat(path, args).await
}
Expand Down Expand Up @@ -187,6 +191,10 @@ pub trait LayeredAccessor: Send + Sync + Debug + Unpin + 'static {

fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)>;

fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
self.inner().blocking_copy(from, to, args)
}

fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
self.inner().blocking_stat(path, args)
}
Expand Down Expand Up @@ -225,6 +233,10 @@ impl<L: LayeredAccessor> Accessor for L {
(self as &L).write(path, args).await
}

async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
(self as &L).copy(from, to, args).await
}

async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
(self as &L).stat(path, args).await
}
Expand Down Expand Up @@ -261,6 +273,10 @@ impl<L: LayeredAccessor> Accessor for L {
(self as &L).blocking_write(path, args)
}

fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
(self as &L).blocking_copy(from, to, args)
}

fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
(self as &L).blocking_stat(path, args)
}
Expand Down
6 changes: 6 additions & 0 deletions core/src/raw/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ pub enum Operation {
Read,
/// Operation for [`crate::raw::Accessor::write`]
Write,
/// Operation for [`crate::raw::Accessor::copy`]
Copy,
/// Operation for [`crate::raw::Accessor::stat`]
Stat,
/// Operation for [`crate::raw::Accessor::delete`]
Expand All @@ -48,6 +50,8 @@ pub enum Operation {
BlockingRead,
/// Operation for [`crate::raw::Accessor::blocking_write`]
BlockingWrite,
/// Operation for [`crate::raw::Accessor::blocking_copy`]
BlockingCopy,
/// Operation for [`crate::raw::Accessor::blocking_stat`]
BlockingStat,
/// Operation for [`crate::raw::Accessor::blocking_delete`]
Expand Down Expand Up @@ -84,6 +88,7 @@ impl From<Operation> for &'static str {
Operation::Create => "create",
Operation::Read => "read",
Operation::Write => "write",
Operation::Copy => "copy",
Operation::Stat => "stat",
Operation::Delete => "delete",
Operation::List => "list",
Expand All @@ -93,6 +98,7 @@ impl From<Operation> for &'static str {
Operation::BlockingCreate => "blocking_create",
Operation::BlockingRead => "blocking_read",
Operation::BlockingWrite => "blocking_write",
Operation::BlockingCopy => "blocking_copy",
Operation::BlockingStat => "blocking_stat",
Operation::BlockingDelete => "blocking_delete",
Operation::BlockingList => "blocking_list",
Expand Down
11 changes: 11 additions & 0 deletions core/src/raw/rps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,17 @@ impl RpWrite {
}
}

/// Reply for `copy` operation.
#[derive(Debug, Clone, Default)]
pub struct RpCopy {}

impl RpCopy {
/// Create a new reply for copy.
pub fn new() -> Self {
Self {}
}
}

#[cfg(test)]
mod tests {
use anyhow::Result;
Expand Down
21 changes: 21 additions & 0 deletions core/src/services/fs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,17 @@ impl Accessor for FsBackend {
Ok((RpWrite::new(), FsWriter::new(target_path, tmp_path, f)))
}

async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
tokio::fs::copy(
self.root.join(from.trim_end_matches('/')),
self.root.join(to.trim_end_matches('/')),
)
.await
.map_err(parse_io_error)?;

Ok(RpCopy::default())
}

async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
let p = self.root.join(path.trim_end_matches('/'));

Expand Down Expand Up @@ -627,6 +638,16 @@ impl Accessor for FsBackend {
Ok((RpWrite::new(), FsWriter::new(target_path, tmp_path, f)))
}

fn blocking_copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
std::fs::copy(
self.root.join(from.trim_end_matches('/')),
self.root.join(to.trim_end_matches('/')),
)
.map_err(parse_io_error)?;

Ok(RpCopy::default())
}

fn blocking_stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
let p = self.root.join(path.trim_end_matches('/'));

Expand Down
52 changes: 52 additions & 0 deletions core/src/types/operator/blocking_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,58 @@ impl BlockingOperator {
self.write_with(path, OpWrite::new(), bs)
}

/// Copy a file from `from` to `to`.
///
/// # Notes
///
/// - `from` and `to` must be a file.
/// - `to` will be overwritten if it exists.
/// - If `from` and `to` are the same, nothing will happen.
/// - `copy` is idempotent. For same `from` and `to` input, the result will be the same.
///
/// # Examples
///
/// ```
/// # use std::io::Result;
/// # use opendal::BlockingOperator;
///
/// # fn test(op: BlockingOperator) -> Result<()> {
/// op.copy("path/to/file", "path/to/file2")?;
/// # Ok(())
/// # }
/// ```
pub fn copy(&self, from: &str, to: &str) -> Result<()> {
if from == to {
return Ok(());
}

let from = normalize_path(from);

if !validate_path(&from, EntryMode::FILE) {
return Err(
Error::new(ErrorKind::IsADirectory, "from path is a directory")
.with_operation("Operator::copy")
.with_context("service", self.info().scheme())
.with_context("from", from),
);
}

let to = normalize_path(to);

if !validate_path(&to, EntryMode::FILE) {
return Err(
Error::new(ErrorKind::IsADirectory, "to path is a directory")
.with_operation("Operator::copy")
.with_context("service", self.info().scheme())
.with_context("to", to),
);
}

self.inner().blocking_copy(&from, &to, OpCopy::new())?;

Ok(())
}

/// Write data with option described in OpenDAL [rfc-0661](../../docs/rfcs/0661-path-in-accessor.md)
///
/// # Notes
Expand Down
Loading

0 comments on commit b5394b6

Please sign in to comment.