From 4e5ee14f6e3ffef43862ebd8e5d0bf61d56f26b4 Mon Sep 17 00:00:00 2001 From: Daniel Carl Jones Date: Wed, 6 Sep 2023 10:44:09 +0100 Subject: [PATCH] Cancel unused in-flight prefetch tasks Previously, mountpoint-s3 would not cancel prefetch tasks that it was going to ignore. Instead, they would continue to be polled by the executor despite the results never being checked. This change ensures that the task handles are dropped which cancels the task/future. In the future, we may want to retain some of these tasks where the prefetcher may still be able to make use of them. Signed-off-by: Daniel Carl Jones --- mountpoint-s3/src/prefetch.rs | 36 ++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/mountpoint-s3/src/prefetch.rs b/mountpoint-s3/src/prefetch.rs index c33e394b3..96fe01be1 100644 --- a/mountpoint-s3/src/prefetch.rs +++ b/mountpoint-s3/src/prefetch.rs @@ -16,6 +16,7 @@ use std::fmt::Debug; use std::time::Duration; use bytes::Bytes; +use futures::future::RemoteHandle; use futures::pin_mut; use futures::stream::StreamExt; use futures::task::{Spawn, SpawnExt}; @@ -182,13 +183,7 @@ where "out-of-order read, resetting prefetch" ); counter!("prefetch.out_of_order", 1); - // TODO cancel inflight requests - // TODO see if we can reuse any inflight requests rather than dropping them immediately - self.current_task = None; - self.future_tasks.write().unwrap().drain(..); - self.next_request_size = self.inner.config.first_request_size; - self.next_sequential_read_offset = offset; - self.next_request_offset = offset; + self.reset_prefetch_to_offset(offset); } debug_assert_eq!(self.next_sequential_read_offset, offset); @@ -205,12 +200,7 @@ where let part = match current_task.read(to_read as usize).await { Err(e) => { - // cancel inflight tasks - self.current_task = None; - self.future_tasks.write().unwrap().drain(..); - self.next_request_size = self.inner.config.first_request_size; - self.next_sequential_read_offset = offset; - self.next_request_offset = offset; + self.reset_prefetch_to_offset(offset); return Err(e); } Ok(part) => part, @@ -330,21 +320,21 @@ where None => break, } } - trace!("finished"); + trace!("request finished"); } } } .instrument(span) }; - // TODO hold onto this so we can cancel the task - self.inner.runtime.spawn(request_task).unwrap(); - // [read] will reset these if the reader stops making sequential requests self.next_request_offset += size; self.next_request_size = self.get_next_request_size(); + let task_handle = self.inner.runtime.spawn_with_handle(request_task).unwrap(); + Some(RequestTask { + _task_handle: task_handle, total_size: size as usize, remaining: size as usize, part_queue, @@ -376,11 +366,23 @@ where next_request_size - remainder } } + + /// Reset this prefetch request to a new offset, clearing any existing tasks queued. + fn reset_prefetch_to_offset(&mut self, offset: u64) { + // TODO see if we can reuse any inflight requests rather than dropping them immediately + self.current_task = None; + self.future_tasks.write().unwrap().drain(..); + self.next_request_size = self.inner.config.first_request_size; + self.next_sequential_read_offset = offset; + self.next_request_offset = offset; + } } /// A single GetObject request submitted to the S3 client #[derive(Debug)] struct RequestTask { + /// Handle on the task/future. Future is cancelled when handle is dropped. + _task_handle: RemoteHandle<()>, remaining: usize, total_size: usize, part_queue: PartQueue,