Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Polish internal types and remove not needed deps #3964

Merged
merged 7 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,8 @@ openssh-sftp-client = { version = "0.14.0", optional = true, features = [
"tracing",
] }
opentelemetry = { version = "0.21.0", optional = true }
parking_lot = "0.12"
percent-encoding = "2"
persy = { version = "1.4.6", optional = true }
pin-project = "1"
prometheus = { version = "0.13", features = ["process"], optional = true }
prometheus-client = { version = "0.22.0", optional = true }
prost = { version = "0.11", optional = true }
Expand Down
4 changes: 2 additions & 2 deletions core/src/raw/enum_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
//! ```txt
//! impl Accessor for OssBackend {
//! type Writer = raw::TwoWays<
//! oio::MultipartUploadWriter<OssWriter>,
//! oio::AppendObjectWriter<OssWriter>,
//! oio::MultipartWriter<OssWriter>,
//! oio::AppendWriter<OssWriter>,
//! >;
//! }
//! ```
Expand Down
9 changes: 2 additions & 7 deletions core/src/raw/oio/list/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ use std::pin::Pin;
use std::task::Context;
use std::task::Poll;

use pin_project::pin_project;
morristai marked this conversation as resolved.
Show resolved Hide resolved

use crate::raw::oio::Entry;
use crate::*;

Expand Down Expand Up @@ -105,8 +103,6 @@ pub trait ListExt: List {
}
}

/// Make this future `!Unpin` for compatibility with async trait methods.
#[pin_project(!Unpin)]
pub struct NextFuture<'a, L: List + Unpin + ?Sized> {
lister: &'a mut L,
}
Expand All @@ -117,9 +113,8 @@ where
{
type Output = Result<Option<Entry>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Option<Entry>>> {
let this = self.project();
Pin::new(this.lister).poll_next(cx)
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Option<Entry>>> {
self.lister.poll_next(cx)
}
}

Expand Down
24 changes: 7 additions & 17 deletions core/src/raw/oio/read/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use std::task::Poll;

use bytes::Bytes;
use futures::Future;
use pin_project::pin_project;
use tokio::io::ReadBuf;

use crate::*;
Expand Down Expand Up @@ -207,8 +206,6 @@ pub trait ReadExt: Read {
}
}

/// Make this future `!Unpin` for compatibility with async trait methods.
#[pin_project(!Unpin)]
pub struct ReadFuture<'a, R: Read + Unpin + ?Sized> {
reader: &'a mut R,
buf: &'a mut [u8],
Expand All @@ -221,13 +218,11 @@ where
type Output = Result<usize>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<usize>> {
let this = self.project();
Pin::new(this.reader).poll_read(cx, this.buf)
let this = self.get_mut();
this.reader.poll_read(cx, this.buf)
}
}

/// Make this future `!Unpin` for compatibility with async trait methods.
#[pin_project(!Unpin)]
pub struct SeekFuture<'a, R: Read + Unpin + ?Sized> {
reader: &'a mut R,
pos: io::SeekFrom,
Expand All @@ -240,13 +235,11 @@ where
type Output = Result<u64>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<u64>> {
let this = self.project();
Pin::new(this.reader).poll_seek(cx, *this.pos)
let this = self.get_mut();
this.reader.poll_seek(cx, this.pos)
}
}

/// Make this future `!Unpin` for compatibility with async trait methods.
#[pin_project(!Unpin)]
pub struct NextFuture<'a, R: Read + Unpin + ?Sized> {
reader: &'a mut R,
}
Expand All @@ -257,14 +250,11 @@ where
{
type Output = Option<Result<Bytes>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
let this = self.project();
Pin::new(this.reader).poll_next(cx)
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
self.reader.poll_next(cx)
}
}

/// Make this future `!Unpin` for compatibility with async trait methods.
#[pin_project(!Unpin)]
pub struct ReadToEndFuture<'a, R: Read + Unpin + ?Sized> {
reader: &'a mut R,
buf: &'a mut Vec<u8>,
Expand All @@ -277,7 +267,7 @@ where
type Output = Result<usize>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<usize>> {
let this = self.project();
let this = self.get_mut();
let start_len = this.buf.len();

loop {
Expand Down
8 changes: 2 additions & 6 deletions core/src/raw/oio/stream/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use std::task::Poll;

use bytes::Bytes;
use bytes::BytesMut;
use pin_project::pin_project;

use crate::*;

Expand Down Expand Up @@ -105,8 +104,6 @@ pub trait StreamExt: Stream {
}
}

/// Make this future `!Unpin` for compatibility with async trait methods.
#[pin_project(!Unpin)]
pub struct NextFuture<'a, T: Stream + Unpin + ?Sized> {
inner: &'a mut T,
}
Expand All @@ -117,9 +114,8 @@ where
{
type Output = Option<Result<Bytes>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
let this = self.project();
Pin::new(this.inner).poll_next(cx)
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
self.inner.poll_next(cx)
}
}

Expand Down
30 changes: 11 additions & 19 deletions core/src/raw/oio/write/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ use std::pin::Pin;
use std::task::Context;
use std::task::Poll;

use pin_project::pin_project;

use crate::raw::*;
use crate::*;

Expand All @@ -33,10 +31,11 @@ use crate::*;
pub enum WriteOperation {
/// Operation for [`Write::write`]
Write,
/// Operation for [`Write::abort`]
Abort,
/// Operation for [`Write::close`]
Close,
/// Operation for [`Write::abort`]
Abort,

/// Operation for [`BlockingWrite::write`]
BlockingWrite,
/// Operation for [`BlockingWrite::close`]
Expand All @@ -62,8 +61,9 @@ impl From<WriteOperation> for &'static str {

match v {
Write => "Writer::write",
Abort => "Writer::abort",
Close => "Writer::close",
Abort => "Writer::abort",

BlockingWrite => "BlockingWriter::write",
BlockingClose => "BlockingWriter::close",
}
Expand Down Expand Up @@ -151,8 +151,6 @@ pub trait WriteExt: Write {
}
}

/// Make this future `!Unpin` for compatibility with async trait methods.
#[pin_project(!Unpin)]
pub struct WriteFuture<'a, W: Write + Unpin + ?Sized> {
writer: &'a mut W,
buf: &'a dyn oio::WriteBuf,
Expand All @@ -165,13 +163,11 @@ where
type Output = Result<usize>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<usize>> {
let this = self.project();
Pin::new(this.writer).poll_write(cx, *this.buf)
let this = self.get_mut();
this.writer.poll_write(cx, this.buf)
}
}

/// Make this future `!Unpin` for compatibility with async trait methods.
#[pin_project(!Unpin)]
pub struct AbortFuture<'a, W: Write + Unpin + ?Sized> {
writer: &'a mut W,
}
Expand All @@ -182,14 +178,11 @@ where
{
type Output = Result<()>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let this = self.project();
Pin::new(this.writer).poll_abort(cx)
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.writer.poll_abort(cx)
}
}

/// Make this future `!Unpin` for compatibility with async trait methods.
#[pin_project(!Unpin)]
pub struct CloseFuture<'a, W: Write + Unpin + ?Sized> {
writer: &'a mut W,
}
Expand All @@ -200,9 +193,8 @@ where
{
type Output = Result<()>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let this = self.project();
Pin::new(this.writer).poll_close(cx)
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.writer.poll_close(cx)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,25 @@ use async_trait::async_trait;
use crate::raw::*;
use crate::*;

/// AppendObjectWrite is used to implement [`Write`] based on append
/// object. By implementing AppendObjectWrite, services don't need to
/// AppendWrite is used to implement [`Write`] based on append
/// object. By implementing AppendWrite, services don't need to
/// care about the details of buffering and uploading parts.
///
/// The layout after adopting [`AppendObjectWrite`]:
/// The layout after adopting [`AppendWrite`]:
///
/// - Services impl `AppendObjectWrite`
/// - `AppendObjectWriter` impl `Write`
/// - Expose `AppendObjectWriter` as `Accessor::Writer`
/// - Services impl `AppendWrite`
/// - `AppendWriter` impl `Write`
/// - Expose `AppendWriter` as `Accessor::Writer`
///
/// ## Requirements
///
/// Services that implement `AppendWrite` must fulfill the following requirements:
///
/// - Must be a http service that could accept `AsyncBody`.
/// - Provide a way to get the current offset of the append object.
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait AppendObjectWrite: Send + Sync + Unpin + 'static {
pub trait AppendWrite: Send + Sync + Unpin + 'static {
/// Get the current offset of the append object.
///
/// Returns `0` if the object is not exist.
Expand All @@ -45,12 +52,12 @@ pub trait AppendObjectWrite: Send + Sync + Unpin + 'static {
async fn append(&self, offset: u64, size: u64, body: AsyncBody) -> Result<()>;
}

/// AppendObjectWriter will implements [`Write`] based on append object.
/// AppendWriter will implements [`Write`] based on append object.
///
/// ## TODO
///
/// - Allow users to switch to un-buffered mode if users write 16MiB every time.
pub struct AppendObjectWriter<W: AppendObjectWrite> {
pub struct AppendWriter<W: AppendWrite> {
state: State<W>,

offset: Option<u64>,
Expand All @@ -65,15 +72,15 @@ enum State<W> {
/// # Safety
///
/// wasm32 is a special target that we only have one event-loop for this state.
unsafe impl<S: AppendObjectWrite> Send for State<S> {}
unsafe impl<S: AppendWrite> Send for State<S> {}

/// # Safety
///
/// We will only take `&mut Self` reference for State.
unsafe impl<S: AppendObjectWrite> Sync for State<S> {}
unsafe impl<S: AppendWrite> Sync for State<S> {}

impl<W: AppendObjectWrite> AppendObjectWriter<W> {
/// Create a new AppendObjectWriter.
impl<W: AppendWrite> AppendWriter<W> {
/// Create a new AppendWriter.
pub fn new(inner: W) -> Self {
Self {
state: State::Idle(Some(inner)),
Expand All @@ -82,9 +89,9 @@ impl<W: AppendObjectWrite> AppendObjectWriter<W> {
}
}

impl<W> oio::Write for AppendObjectWriter<W>
impl<W> oio::Write for AppendWriter<W>
where
W: AppendObjectWrite,
W: AppendWrite,
{
fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> {
loop {
Expand Down
9 changes: 9 additions & 0 deletions core/src/raw/oio/write/block_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ use crate::*;
/// ```
///
/// We will use `write_once` instead of starting a new block upload.
///
/// # Requirements
///
/// Services that implement `BlockWrite` must fulfill the following requirements:
///
/// - Must be a http service that could accept `AsyncBody`.
/// - Don't need initialization before writing.
/// - Block ID is generated by caller `BlockWrite` instead of services.
/// - Complete block by an ordered block id list.
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait BlockWrite: Send + Sync + Unpin + 'static {
Expand Down
14 changes: 7 additions & 7 deletions core/src/raw/oio/write/exact_buf_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,6 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
Poll::Ready(Ok(written))
}

fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.buffer.clear();
self.inner.poll_abort(cx)
}

fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
while !self.buffer.is_empty() {
let n = ready!(self.inner.poll_write(cx, &self.buffer))?;
Expand All @@ -78,6 +73,11 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {

self.inner.poll_close(cx)
}

fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.buffer.clear();
self.inner.poll_abort(cx)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -110,11 +110,11 @@ mod tests {
Poll::Ready(Ok(bs.chunk().len()))
}

fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Ok(()))
}

fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Ok(()))
}
}
Expand Down
Loading