-
Notifications
You must be signed in to change notification settings - Fork 171
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce new abstraction between the prefetcher and GetObject calls #552
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,27 +8,25 @@ | |
//! non-sequential read, we abandon the prefetching and start again with the minimum request size. | ||
|
||
pub mod checksummed_bytes; | ||
mod feed; | ||
mod part; | ||
mod part_queue; | ||
|
||
use std::collections::VecDeque; | ||
use std::fmt::Debug; | ||
use std::time::Duration; | ||
|
||
use bytes::Bytes; | ||
use futures::future::RemoteHandle; | ||
use futures::pin_mut; | ||
use futures::stream::StreamExt; | ||
use futures::task::{Spawn, SpawnExt}; | ||
use metrics::counter; | ||
use mountpoint_s3_client::error::{GetObjectError, ObjectClientError}; | ||
use mountpoint_s3_client::types::ETag; | ||
use mountpoint_s3_client::ObjectClient; | ||
use mountpoint_s3_crt::checksums::crc32c; | ||
use thiserror::Error; | ||
use tracing::{debug_span, error, trace, Instrument}; | ||
|
||
use crate::prefetch::checksummed_bytes::{ChecksummedBytes, IntegrityError}; | ||
use crate::prefetch::feed::{ClientPartFeed, ObjectPartFeed}; | ||
use crate::prefetch::part::Part; | ||
use crate::prefetch::part_queue::{unbounded_part_queue, PartQueue}; | ||
use crate::sync::{Arc, RwLock}; | ||
|
@@ -45,8 +43,6 @@ pub struct PrefetcherConfig { | |
pub sequential_prefetch_multiplier: usize, | ||
/// Timeout to wait for a part to become available | ||
pub read_timeout: Duration, | ||
/// The size of the parts that the prefetcher is trying to align with | ||
pub part_alignment: usize, | ||
} | ||
|
||
impl Default for PrefetcherConfig { | ||
|
@@ -65,7 +61,6 @@ impl Default for PrefetcherConfig { | |
max_request_size: 2 * 1024 * 1024 * 1024, | ||
sequential_prefetch_multiplier: 8, | ||
read_timeout: Duration::from_secs(60), | ||
part_alignment: 8 * 1024 * 1024, | ||
} | ||
} | ||
} | ||
|
@@ -76,22 +71,28 @@ pub struct Prefetcher<Client, Runtime> { | |
inner: Arc<PrefetcherInner<Client, Runtime>>, | ||
} | ||
|
||
#[derive(Debug)] | ||
struct PrefetcherInner<Client, Runtime> { | ||
client: Arc<Client>, | ||
part_feed: Arc<dyn ObjectPartFeed<Client> + Send + Sync>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any thoughts on whether this should be a generic type rather than dynamic dispatch? I guess we can get away with it here (unlike There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is a pragmatic choice which we can revert later on. The downside of a new generic type here is that it would propagate up to the root |
||
config: PrefetcherConfig, | ||
runtime: Runtime, | ||
} | ||
|
||
impl<Client, Runtime> Debug for PrefetcherInner<Client, Runtime> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. derive(Debug) did not seem to work with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's because the Debug impl for |
||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
f.debug_struct("PrefetcherInner").field("config", &self.config).finish() | ||
} | ||
} | ||
|
||
impl<Client, Runtime> Prefetcher<Client, Runtime> | ||
where | ||
Client: ObjectClient + Send + Sync + 'static, | ||
Runtime: Spawn, | ||
{ | ||
/// Create a new [Prefetcher] that will make requests to the given client. | ||
pub fn new(client: Arc<Client>, runtime: Runtime, config: PrefetcherConfig) -> Self { | ||
let part_feed = Arc::new(ClientPartFeed::new(client)); | ||
let inner = PrefetcherInner { | ||
client, | ||
part_feed, | ||
config, | ||
runtime, | ||
}; | ||
|
@@ -101,7 +102,7 @@ where | |
|
||
/// Start a new get request to the specified object. | ||
pub fn get(&self, bucket: &str, key: &str, size: u64, etag: ETag) -> PrefetchGetObject<Client, Runtime> { | ||
PrefetchGetObject::new(Arc::clone(&self.inner), bucket, key, size, etag) | ||
PrefetchGetObject::new(self.inner.clone(), bucket, key, size, etag) | ||
} | ||
} | ||
|
||
|
@@ -276,55 +277,16 @@ where | |
trace!(?range, size, "spawning request"); | ||
|
||
let request_task = { | ||
let client = Arc::clone(&self.inner.client); | ||
let feed = self.inner.part_feed.clone(); | ||
let preferred_part_size = self.preferred_part_size; | ||
let bucket = self.bucket.to_owned(); | ||
let key = self.key.to_owned(); | ||
let etag = self.etag.clone(); | ||
let span = debug_span!("prefetch", range=?range); | ||
|
||
async move { | ||
match client.get_object(&bucket, &key, Some(range.clone()), Some(etag)).await { | ||
Err(e) => { | ||
error!(error=?e, "RequestTask get object failed"); | ||
part_queue_producer.push(Err(e)); | ||
} | ||
Ok(request) => { | ||
pin_mut!(request); | ||
loop { | ||
match request.next().await { | ||
Some(Ok((offset, body))) => { | ||
// pre-split the body into multiple parts as suggested by preferred part size | ||
// in order to avoid validating checksum on large parts at read. | ||
assert!(preferred_part_size > 0); | ||
let mut body: Bytes = body.into(); | ||
let mut curr_offset = offset; | ||
loop { | ||
let chunk_size = preferred_part_size.min(body.len()); | ||
if chunk_size == 0 { | ||
break; | ||
} | ||
let chunk = body.split_to(chunk_size); | ||
// S3 doesn't provide checksum for us if the request range is not aligned to object part boundaries, | ||
// so we're computing our own checksum here. | ||
let checksum = crc32c::checksum(&chunk); | ||
let checksum_bytes = ChecksummedBytes::new(chunk, checksum); | ||
let part = Part::new(&key, curr_offset, checksum_bytes); | ||
curr_offset += part.len() as u64; | ||
part_queue_producer.push(Ok(part)); | ||
} | ||
} | ||
Some(Err(e)) => { | ||
error!(error=?e, "RequestTask body part failed"); | ||
part_queue_producer.push(Err(e)); | ||
break; | ||
} | ||
None => break, | ||
} | ||
} | ||
trace!("request finished"); | ||
} | ||
} | ||
feed.get_object_parts(&bucket, &key, range, etag, preferred_part_size, part_queue_producer) | ||
.await | ||
} | ||
.instrument(span) | ||
}; | ||
|
@@ -344,29 +306,14 @@ where | |
} | ||
|
||
/// Suggest next request size. | ||
/// Normally, next request size is current request size multiply by sequential prefetch multiplier, | ||
/// but if the request size is getting bigger than a part size we will try to align it to part boundaries. | ||
/// The next request size is the current request size multiplied by sequential prefetch multiplier. | ||
fn get_next_request_size(&self) -> usize { | ||
// calculate next request size | ||
let next_request_size = (self.next_request_size * self.inner.config.sequential_prefetch_multiplier) | ||
.min(self.inner.config.max_request_size); | ||
|
||
let offset_in_part = (self.next_request_offset % self.inner.config.part_alignment as u64) as usize; | ||
// if the offset is not at the start of the part we will drain all the bytes from that part first | ||
if offset_in_part != 0 { | ||
let remaining_in_part = self.inner.config.part_alignment - offset_in_part; | ||
next_request_size.min(remaining_in_part) | ||
} else { | ||
// if the next request size is smaller than the part size, just return that value | ||
if next_request_size < self.inner.config.part_alignment { | ||
return next_request_size; | ||
} | ||
|
||
// if it exceeds part boundaries, trim it to the part boundaries | ||
let next_request_boundary = self.next_request_offset + next_request_size as u64; | ||
let remainder = (next_request_boundary % self.inner.config.part_alignment as u64) as usize; | ||
next_request_size - remainder | ||
} | ||
self.inner | ||
.part_feed | ||
.get_aligned_request_size(self.next_request_offset, next_request_size) | ||
} | ||
|
||
/// Reset this prefetch request to a new offset, clearing any existing tasks queued. | ||
|
@@ -457,7 +404,6 @@ mod tests { | |
max_request_size: test_config.max_request_size, | ||
sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier, | ||
read_timeout: Duration::from_secs(5), | ||
part_alignment: test_config.client_part_size, | ||
}; | ||
let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); | ||
let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config); | ||
|
@@ -608,7 +554,6 @@ mod tests { | |
sequential_prefetch_multiplier: prefetch_multiplier, | ||
max_request_size, | ||
read_timeout: Duration::from_secs(60), | ||
part_alignment: part_size, | ||
}; | ||
let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); | ||
let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
use std::{fmt::Debug, ops::Range, sync::Arc}; | ||
|
||
use async_trait::async_trait; | ||
use bytes::Bytes; | ||
use futures::{pin_mut, StreamExt}; | ||
use mountpoint_s3_client::{ | ||
error::{GetObjectError, ObjectClientError}, | ||
types::ETag, | ||
ObjectClient, | ||
}; | ||
use mountpoint_s3_crt::checksums::crc32c; | ||
use tracing::{error, trace}; | ||
|
||
use crate::prefetch::{checksummed_bytes::ChecksummedBytes, part::Part, part_queue::PartQueueProducer}; | ||
|
||
/// A generic interface to retrieve data from objects in a S3-like store. | ||
#[async_trait] | ||
pub trait ObjectPartFeed<Client: ObjectClient> { | ||
/// Get the content of an object in fixed size parts. The parts are pushed to the provided `part_sink` | ||
/// and are guaranteed to be contiguous and in the correct order. Callers need to specify a preferred | ||
/// size for the parts, but implementations are allowed to ignore it. | ||
async fn get_object_parts( | ||
&self, | ||
bucket: &str, | ||
key: &str, | ||
range: Range<u64>, | ||
if_match: ETag, | ||
preferred_part_size: usize, | ||
part_sink: PartQueueProducer<ObjectClientError<GetObjectError, Client::ClientError>>, | ||
); | ||
|
||
/// Adjust the size of a request to align to optimal part boundaries for this client. | ||
fn get_aligned_request_size(&self, offset: u64, preferred_size: usize) -> usize; | ||
} | ||
|
||
/// [ObjectPartFeed] implementation which delegates retrieving object data to a [Client]. | ||
#[derive(Debug)] | ||
pub struct ClientPartFeed<Client> { | ||
client: Arc<Client>, | ||
} | ||
Comment on lines
+36
to
+40
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where do we plan to put the cache-based implementation of this? In this file also? I'm wondering if there's a good way to break down this file. |
||
|
||
impl<Client> ClientPartFeed<Client> { | ||
pub fn new(client: Arc<Client>) -> Self { | ||
Self { client } | ||
} | ||
} | ||
|
||
#[async_trait] | ||
impl<Client> ObjectPartFeed<Client> for ClientPartFeed<Client> | ||
where | ||
Client: ObjectClient + Send + Sync + 'static, | ||
{ | ||
async fn get_object_parts( | ||
&self, | ||
bucket: &str, | ||
key: &str, | ||
range: Range<u64>, | ||
if_match: ETag, | ||
preferred_part_size: usize, | ||
part_queue_producer: PartQueueProducer<ObjectClientError<GetObjectError, Client::ClientError>>, | ||
) { | ||
assert!(preferred_part_size > 0); | ||
let get_object_result = match self.client.get_object(bucket, key, Some(range), Some(if_match)).await { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is moved from |
||
Ok(get_object_result) => get_object_result, | ||
Err(e) => { | ||
error!(error=?e, "GetObject request failed"); | ||
part_queue_producer.push(Err(e)); | ||
return; | ||
} | ||
}; | ||
|
||
pin_mut!(get_object_result); | ||
loop { | ||
match get_object_result.next().await { | ||
Some(Ok((offset, body))) => { | ||
trace!(offset, length = body.len(), "received GetObject part"); | ||
// pre-split the body into multiple parts as suggested by preferred part size | ||
// in order to avoid validating checksum on large parts at read. | ||
let mut body: Bytes = body.into(); | ||
let mut curr_offset = offset; | ||
loop { | ||
let chunk_size = preferred_part_size.min(body.len()); | ||
if chunk_size == 0 { | ||
break; | ||
} | ||
let chunk = body.split_to(chunk_size); | ||
// S3 doesn't provide checksum for us if the request range is not aligned to | ||
// object part boundaries, so we're computing our own checksum here. | ||
let checksum = crc32c::checksum(&chunk); | ||
let checksum_bytes = ChecksummedBytes::new(chunk, checksum); | ||
let part = Part::new(key, curr_offset, checksum_bytes); | ||
curr_offset += part.len() as u64; | ||
part_queue_producer.push(Ok(part)); | ||
} | ||
} | ||
Some(Err(e)) => { | ||
error!(error=?e, "GetObject body part failed"); | ||
part_queue_producer.push(Err(e)); | ||
break; | ||
} | ||
None => break, | ||
} | ||
} | ||
trace!("request finished"); | ||
} | ||
|
||
fn get_aligned_request_size(&self, offset: u64, preferred_length: usize) -> usize { | ||
// If the request size is bigger than a part size we will try to align it to part boundaries. | ||
let part_alignment = self.client.part_size().unwrap_or(8 * 1024 * 1024); | ||
let offset_in_part = (offset % part_alignment as u64) as usize; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved from |
||
if offset_in_part != 0 { | ||
// if the offset is not at the start of the part we will drain all the bytes from that part first | ||
let remaining_in_part = part_alignment - offset_in_part; | ||
preferred_length.min(remaining_in_part) | ||
} else { | ||
// if the request size is smaller than the part size, just return that value | ||
if preferred_length < part_alignment { | ||
preferred_length | ||
} else { | ||
// if it exceeds part boundaries, trim it to the part boundaries | ||
let request_boundary = offset + preferred_length as u64; | ||
let remainder = (request_boundary % part_alignment as u64) as usize; | ||
preferred_length - remainder | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now we retrieve this directly from the client