Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update prefetcher to cancel discarded tasks #505

Merged
merged 1 commit into from
Sep 6, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 19 additions & 17 deletions mountpoint-s3/src/prefetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::fmt::Debug;
use std::time::Duration;

use bytes::Bytes;
use futures::future::RemoteHandle;
use futures::pin_mut;
use futures::stream::StreamExt;
use futures::task::{Spawn, SpawnExt};
Expand Down Expand Up @@ -182,13 +183,7 @@ where
"out-of-order read, resetting prefetch"
);
counter!("prefetch.out_of_order", 1);
// TODO cancel inflight requests
// TODO see if we can reuse any inflight requests rather than dropping them immediately
self.current_task = None;
self.future_tasks.write().unwrap().drain(..);
self.next_request_size = self.inner.config.first_request_size;
self.next_sequential_read_offset = offset;
self.next_request_offset = offset;
self.reset_prefetch_to_offset(offset);
}
debug_assert_eq!(self.next_sequential_read_offset, offset);

Expand All @@ -205,12 +200,7 @@ where

let part = match current_task.read(to_read as usize).await {
Err(e) => {
// cancel inflight tasks
self.current_task = None;
self.future_tasks.write().unwrap().drain(..);
self.next_request_size = self.inner.config.first_request_size;
self.next_sequential_read_offset = offset;
self.next_request_offset = offset;
self.reset_prefetch_to_offset(offset);
return Err(e);
}
Ok(part) => part,
Expand Down Expand Up @@ -330,21 +320,21 @@ where
None => break,
}
}
trace!("finished");
trace!("request finished");
}
}
}
.instrument(span)
};

// TODO hold onto this so we can cancel the task
self.inner.runtime.spawn(request_task).unwrap();

// [read] will reset these if the reader stops making sequential requests
self.next_request_offset += size;
self.next_request_size = self.get_next_request_size();

let task_handle = self.inner.runtime.spawn_with_handle(request_task).unwrap();

Some(RequestTask {
_task_handle: task_handle,
total_size: size as usize,
remaining: size as usize,
part_queue,
Expand Down Expand Up @@ -376,11 +366,23 @@ where
next_request_size - remainder
}
}

/// Reset this prefetch request to a new offset, clearing any existing tasks queued.
fn reset_prefetch_to_offset(&mut self, offset: u64) {
// TODO see if we can reuse any inflight requests rather than dropping them immediately
self.current_task = None;
self.future_tasks.write().unwrap().drain(..);
self.next_request_size = self.inner.config.first_request_size;
self.next_sequential_read_offset = offset;
self.next_request_offset = offset;
}
}

/// A single GetObject request submitted to the S3 client
#[derive(Debug)]
struct RequestTask<E> {
/// Handle on the task/future. Future is cancelled when handle is dropped.
_task_handle: RemoteHandle<()>,
remaining: usize,
total_size: usize,
part_queue: PartQueue<E>,
Expand Down