Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Add Copy Support #1841

Merged
merged 3 commits into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions core/src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,47 @@ impl<A: Accessor> LayeredAccessor for LoggingAccessor<A> {
})
}

async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
suyanhanx marked this conversation as resolved.
Show resolved Hide resolved
debug!(
target: LOGGING_TARGET,
"service={} operation={} from={} to={} -> started",
self.scheme,
Operation::Copy,
from,
to
);

self.inner
.copy(from, to, args)
.await
.map(|v| {
debug!(
target: LOGGING_TARGET,
"service={} operation={} from={} to={} -> finished",
self.scheme,
Operation::Copy,
from,
to
);
v
})
.map_err(|err| {
if let Some(lvl) = self.err_level(&err) {
log!(
target: LOGGING_TARGET,
lvl,
"service={} operation={} from={} to={} -> {}: {err:?}",
self.scheme,
Operation::Copy,
from,
to,
self.err_status(&err)
)
};
err
})
}

async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
debug!(
target: LOGGING_TARGET,
Expand Down Expand Up @@ -699,6 +740,46 @@ impl<A: Accessor> LayeredAccessor for LoggingAccessor<A> {
})
}

fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
debug!(
target: LOGGING_TARGET,
"service={} operation={} from={} to={} -> started",
self.scheme,
Operation::BlockingCopy,
from,
to,
);

self.inner
.blocking_copy(from, to, args)
.map(|v| {
debug!(
target: LOGGING_TARGET,
"service={} operation={} from={} to={} -> finished",
self.scheme,
Operation::BlockingCopy,
from,
to,
);
v
})
.map_err(|err| {
if let Some(lvl) = self.err_level(&err) {
log!(
target: LOGGING_TARGET,
lvl,
"service={} operation={} from={} to={} -> {}: {err:?}",
self.scheme,
Operation::BlockingCopy,
from,
to,
self.err_status(&err)
);
}
err
})
}

fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
debug!(
target: LOGGING_TARGET,
Expand Down
42 changes: 42 additions & 0 deletions core/src/raw/accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,24 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static {
))
}

/// Invoke the `copy` operation on the specified `from` path and `to` path.
///
/// Require [AccessorCapability::Read] and [AccessorCapability::Write]
///
/// # Behaviour
///
/// - `from` and `to` MUST be file path, DON'T NEED to check mode.
/// - Copy on existing file SHOULD succeed.
/// - Copy on existing file SHOULD overwrite and truncate.
async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
let (_, _, _) = (from, to, args);

Err(Error::new(
ErrorKind::Unsupported,
"operation is not supported",
))
}

/// Invoke the `stat` operation on the specified path.
///
/// Require [`AccessorCapability::Read`]
Expand Down Expand Up @@ -263,6 +281,20 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static {
))
}

/// Invoke the `blocking_copy` operation on the specified `from` path and `to` path.
///
/// This operation is the blocking version of [`Accessor::copy`]
///
/// Require [`AccessorCapability::Read`], [`AccessorCapability::Write`] and [`AccessorCapability::Blocking`]
fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
let (_, _, _) = (from, to, args);

Err(Error::new(
ErrorKind::Unsupported,
"operation is not supported",
))
}

/// Invoke the `blocking_stat` operation on the specified path.
///
/// This operation is the blocking version of [`Accessor::stat`]
Expand Down Expand Up @@ -369,6 +401,11 @@ impl<T: Accessor + ?Sized> Accessor for Arc<T> {
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
self.as_ref().write(path, args).await
}

async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
self.as_ref().copy(from, to, args).await
}

async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
self.as_ref().stat(path, args).await
}
Expand Down Expand Up @@ -399,6 +436,11 @@ impl<T: Accessor + ?Sized> Accessor for Arc<T> {
fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
self.as_ref().blocking_write(path, args)
}

fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
self.as_ref().blocking_copy(from, to, args)
}

fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
self.as_ref().blocking_stat(path, args)
}
Expand Down
16 changes: 16 additions & 0 deletions core/src/raw/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ pub trait LayeredAccessor: Send + Sync + Debug + Unpin + 'static {

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)>;

async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
self.inner().copy(from, to, args).await
}

async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
self.inner().stat(path, args).await
}
Expand Down Expand Up @@ -187,6 +191,10 @@ pub trait LayeredAccessor: Send + Sync + Debug + Unpin + 'static {

fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)>;

fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
self.inner().blocking_copy(from, to, args)
}

fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
self.inner().blocking_stat(path, args)
}
Expand Down Expand Up @@ -225,6 +233,10 @@ impl<L: LayeredAccessor> Accessor for L {
(self as &L).write(path, args).await
}

async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
(self as &L).copy(from, to, args).await
}

async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
(self as &L).stat(path, args).await
}
Expand Down Expand Up @@ -261,6 +273,10 @@ impl<L: LayeredAccessor> Accessor for L {
(self as &L).blocking_write(path, args)
}

fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
(self as &L).blocking_copy(from, to, args)
}

fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
(self as &L).blocking_stat(path, args)
}
Expand Down
6 changes: 6 additions & 0 deletions core/src/raw/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ pub enum Operation {
Read,
/// Operation for [`crate::raw::Accessor::write`]
Write,
/// Operation for [`crate::raw::Accessor::copy`]
Copy,
/// Operation for [`crate::raw::Accessor::stat`]
Stat,
/// Operation for [`crate::raw::Accessor::delete`]
Expand All @@ -48,6 +50,8 @@ pub enum Operation {
BlockingRead,
/// Operation for [`crate::raw::Accessor::blocking_write`]
BlockingWrite,
/// Operation for [`crate::raw::Accessor::blocking_copy`]
BlockingCopy,
/// Operation for [`crate::raw::Accessor::blocking_stat`]
BlockingStat,
/// Operation for [`crate::raw::Accessor::blocking_delete`]
Expand Down Expand Up @@ -84,6 +88,7 @@ impl From<Operation> for &'static str {
Operation::Create => "create",
Operation::Read => "read",
Operation::Write => "write",
Operation::Copy => "copy",
Operation::Stat => "stat",
Operation::Delete => "delete",
Operation::List => "list",
Expand All @@ -93,6 +98,7 @@ impl From<Operation> for &'static str {
Operation::BlockingCreate => "blocking_create",
Operation::BlockingRead => "blocking_read",
Operation::BlockingWrite => "blocking_write",
Operation::BlockingCopy => "blocking_copy",
Operation::BlockingStat => "blocking_stat",
Operation::BlockingDelete => "blocking_delete",
Operation::BlockingList => "blocking_list",
Expand Down
11 changes: 11 additions & 0 deletions core/src/raw/rps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,17 @@ impl RpWrite {
}
}

/// Reply for `copy` operation.
#[derive(Debug, Clone, Default)]
pub struct RpCopy {}

impl RpCopy {
/// Create a new reply for copy.
pub fn new() -> Self {
Self {}
}
}

#[cfg(test)]
mod tests {
use anyhow::Result;
Expand Down
21 changes: 21 additions & 0 deletions core/src/services/fs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,17 @@ impl Accessor for FsBackend {
Ok((RpWrite::new(), FsWriter::new(target_path, tmp_path, f)))
}

async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
suyanhanx marked this conversation as resolved.
Show resolved Hide resolved
tokio::fs::copy(
self.root.join(from.trim_end_matches('/')),
self.root.join(to.trim_end_matches('/')),
)
.await
.map_err(parse_io_error)?;

Ok(RpCopy::default())
}

async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
let p = self.root.join(path.trim_end_matches('/'));

Expand Down Expand Up @@ -627,6 +638,16 @@ impl Accessor for FsBackend {
Ok((RpWrite::new(), FsWriter::new(target_path, tmp_path, f)))
}

fn blocking_copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
std::fs::copy(
self.root.join(from.trim_end_matches('/')),
self.root.join(to.trim_end_matches('/')),
)
.map_err(parse_io_error)?;

Ok(RpCopy::default())
}

fn blocking_stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
let p = self.root.join(path.trim_end_matches('/'));

Expand Down
52 changes: 52 additions & 0 deletions core/src/types/operator/blocking_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,58 @@ impl BlockingOperator {
self.write_with(path, OpWrite::new(), bs)
}

/// Copy a file from `from` to `to`.
///
/// # Notes
///
/// - `from` and `to` must be a file.
/// - `to` will be overwritten if it exists.
/// - If `from` and `to` are the same, nothing will happen.
/// - `copy` is idempotent. For same `from` and `to` input, the result will be the same.
///
/// # Examples
///
/// ```
/// # use std::io::Result;
/// # use opendal::BlockingOperator;
///
/// # fn test(op: BlockingOperator) -> Result<()> {
/// op.copy("path/to/file", "path/to/file2")?;
/// # Ok(())
/// # }
/// ```
pub fn copy(&self, from: &str, to: &str) -> Result<()> {
if from == to {
suyanhanx marked this conversation as resolved.
Show resolved Hide resolved
return Ok(());
}

let from = normalize_path(from);

if !validate_path(&from, EntryMode::FILE) {
return Err(
Error::new(ErrorKind::IsADirectory, "from path is a directory")
.with_operation("Operator::copy")
.with_context("service", self.info().scheme())
.with_context("from", from),
);
}

let to = normalize_path(to);

if !validate_path(&to, EntryMode::FILE) {
return Err(
Error::new(ErrorKind::IsADirectory, "to path is a directory")
.with_operation("Operator::copy")
.with_context("service", self.info().scheme())
.with_context("to", to),
);
}

self.inner().blocking_copy(&from, &to, OpCopy::new())?;

Ok(())
}

/// Write data with option described in OpenDAL [rfc-0661](../../docs/rfcs/0661-path-in-accessor.md)
///
/// # Notes
Expand Down
5 changes: 5 additions & 0 deletions core/src/types/operator/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ impl OperatorInfo {
self.0.capabilities().contains(AccessorCapability::Write)
}

/// Check if current backend supports [`Accessor::copy`] or not.
pub fn can_copy(&self) -> bool {
suyanhanx marked this conversation as resolved.
Show resolved Hide resolved
self.0.capabilities().contains(AccessorCapability::Read | AccessorCapability::Write)
}

/// Check if current backend supports [`Accessor::list`] or not.
pub fn can_list(&self) -> bool {
self.0.capabilities().contains(AccessorCapability::List)
Expand Down
Loading