Skip to content

Commit

Permalink
refactor: Polish concurrent list (apache#3658)
Browse files Browse the repository at this point in the history
  • Loading branch information
Xuanwo authored Nov 24, 2023
1 parent 44f06f4 commit 9cf115c
Showing 1 changed file with 70 additions and 31 deletions.
101 changes: 70 additions & 31 deletions core/src/types/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,58 @@ pub struct Lister {
/// required_metakey is the metakey required by users.
required_metakey: FlagSet<Metakey>,

/// task_queue is used to store tasks that are run in concurrent.
task_queue: VecDeque<StatTask>,
/// tasks is used to store tasks that are run in concurrent.
tasks: VecDeque<StatTask>,
errored: bool,
}

/// StatTask is used to store the task that is run in concurrent.
///
/// # Note for clippy
///
/// Clippy will raise error for this enum like the following:
///
/// ```shell
/// error: large size difference between variants
/// --> core/src/types/list.rs:64:1
/// |
/// 64 | / enum StatTask {
/// 65 | | /// Handle is used to store the join handle of spawned task.
/// 66 | | Handle(JoinHandle<(String, Result<RpStat>)>),
/// | | -------------------------------------------- the second-largest variant contains at least 0 bytes
/// 67 | | /// KnownEntry is used to store the entry that already contains the required metakey.
/// 68 | | KnownEntry(Option<Entry>),
/// | | ------------------------- the largest variant contains at least 264 bytes
/// 69 | | }
/// | |_^ the entire enum is at least 0 bytes
/// |
/// = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#large_enum_variant
/// = note: `-D clippy::large-enum-variant` implied by `-D warnings`
/// = help: to override `-D warnings` add `#[allow(clippy::large_enum_variant)]`
/// help: consider boxing the large fields to reduce the total size of the enum
/// |
/// 68 | KnownEntry(Box<Option<Entry>>),
/// | ~~~~~~~~~~~~~~~~~~
/// ```
/// But this lint is wrong since it doesn't take the generic param JoinHandle into account. In fact, they have exactly
/// the same size:
///
/// ```rust
/// use std::mem::size_of;
/// use opendal::Result;
/// use opendal::Entry;
///
/// assert_eq!(264, size_of::<(String, Result<opendal::raw::RpStat>)>());
/// assert_eq!(264, size_of::<Option<Entry>>());
/// ```
///
/// So let's ignore this lint:
#[allow(clippy::large_enum_variant)]
enum StatTask {
/// Handle is used to store the join handle of spawned task.
Handle(JoinHandle<(String, Result<RpStat>)>),
/// KnownEntry is used to store the entry that already contains the required metakey.
KnownEntry(Box<Option<(String, Metadata)>>),
/// Stating is used to store the join handle of spawned task.
Stating(JoinHandle<(String, Result<RpStat>)>),
/// Known is used to store the entry that already contains the required metakey.
Known(Option<Entry>),
}

/// # Safety
Expand All @@ -74,7 +116,7 @@ impl Lister {
lister: Some(lister),
required_metakey,

task_queue: VecDeque::with_capacity(concurrent),
tasks: VecDeque::with_capacity(concurrent),
errored: false,
})
}
Expand All @@ -89,31 +131,23 @@ impl Stream for Lister {
return Poll::Ready(None);
}

let task_queue_len = self.task_queue.len();
let task_queue_cap = self.task_queue.capacity();

if let Some(lister) = self.lister.as_mut() {
if task_queue_len < task_queue_cap {
// Trying to pull more tasks if there are more space.
if self.tasks.len() < self.tasks.capacity() {
if let Some(lister) = self.lister.as_mut() {
match lister.poll_next(cx) {
Poll::Pending => {
if task_queue_len == 0 {
return Poll::Pending;
}
}
Poll::Pending => {}
Poll::Ready(Ok(Some(oe))) => {
let (path, metadata) = oe.into_entry().into_parts();
// TODO: we can optimize this by checking the provided metakey provided by services.
if metadata.contains_metakey(self.required_metakey) {
self.task_queue
.push_back(StatTask::KnownEntry(Box::new(Some((path, metadata)))));
self.tasks
.push_back(StatTask::Known(Some(Entry::new(path, metadata))));
} else {
let acc = self.acc.clone();
let fut = async move {
let res = acc.stat(&path, OpStat::default()).await;
(path, res)
};
self.task_queue
.push_back(StatTask::Handle(tokio::spawn(fut)));
self.tasks.push_back(StatTask::Stating(tokio::spawn(fut)));
}
}
Poll::Ready(Ok(None)) => {
Expand All @@ -127,14 +161,16 @@ impl Stream for Lister {
}
}

if let Some(handle) = self.task_queue.front_mut() {
if let Some(handle) = self.tasks.front_mut() {
return match handle {
StatTask::Handle(handle) => {
StatTask::Stating(handle) => {
let (path, rp) = ready!(handle.poll_unpin(cx)).map_err(new_task_join_error)?;

// Make sure this task has been popped after it's ready.
self.tasks.pop_front();

match rp {
Ok(rp) => {
self.task_queue.pop_front();
let metadata = rp.into_metadata();
Poll::Ready(Some(Ok(Entry::new(path, metadata))))
}
Expand All @@ -144,15 +180,19 @@ impl Stream for Lister {
}
}
}
StatTask::KnownEntry(entry) => {
let (path, metadata) = entry.take().expect("entry must be valid");
self.task_queue.pop_front();
Poll::Ready(Some(Ok(Entry::new(path, metadata))))
StatTask::Known(entry) => {
let entry = entry.take().expect("entry must be valid");
self.tasks.pop_front();
Poll::Ready(Some(Ok(entry)))
}
};
}

Poll::Ready(None)
if self.lister.is_none() {
Poll::Ready(None)
} else {
Poll::Pending
}
}
}

Expand Down Expand Up @@ -213,7 +253,6 @@ impl Iterator for BlockingLister {
};

let (path, metadata) = entry.into_entry().into_parts();
// TODO: we can optimize this by checking the provided metakey provided by services.
if metadata.contains_metakey(self.required_metakey) {
return Some(Ok(Entry::new(path, metadata)));
}
Expand Down

0 comments on commit 9cf115c

Please sign in to comment.