Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce new abstraction between the prefetcher and GetObject calls #552

Merged
merged 1 commit into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions mountpoint-s3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mountpoint-s3-crt = { path = "../mountpoint-s3-crt", version = "0.3.0" }
anyhow = { version = "1.0.64", features = ["backtrace"] }
async-channel = "1.8.0"
async-lock = "2.6.0"
async-trait = "0.1.57"
bytes = "1.2.1"
clap = { version = "4.1.9", features = ["derive"] }
crc32c = "0.6.3"
Expand Down
1 change: 0 additions & 1 deletion mountpoint-s3/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,6 @@ fn mount(args: CliArgs) -> anyhow::Result<FuseSession> {
filesystem_config.file_mode = file_mode;
}
filesystem_config.storage_class = args.storage_class;
filesystem_config.prefetcher_config.part_alignment = args.part_size as usize;
filesystem_config.allow_delete = args.allow_delete;

let fs = S3FuseFilesystem::new(client, runtime, &args.bucket_name, &prefix, filesystem_config);
Expand Down
93 changes: 19 additions & 74 deletions mountpoint-s3/src/prefetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,25 @@
//! non-sequential read, we abandon the prefetching and start again with the minimum request size.

pub mod checksummed_bytes;
mod feed;
mod part;
mod part_queue;

use std::collections::VecDeque;
use std::fmt::Debug;
use std::time::Duration;

use bytes::Bytes;
use futures::future::RemoteHandle;
use futures::pin_mut;
use futures::stream::StreamExt;
use futures::task::{Spawn, SpawnExt};
use metrics::counter;
use mountpoint_s3_client::error::{GetObjectError, ObjectClientError};
use mountpoint_s3_client::types::ETag;
use mountpoint_s3_client::ObjectClient;
use mountpoint_s3_crt::checksums::crc32c;
use thiserror::Error;
use tracing::{debug_span, error, trace, Instrument};

use crate::prefetch::checksummed_bytes::{ChecksummedBytes, IntegrityError};
use crate::prefetch::feed::{ClientPartFeed, ObjectPartFeed};
use crate::prefetch::part::Part;
use crate::prefetch::part_queue::{unbounded_part_queue, PartQueue};
use crate::sync::{Arc, RwLock};
Expand All @@ -45,8 +43,6 @@ pub struct PrefetcherConfig {
pub sequential_prefetch_multiplier: usize,
/// Timeout to wait for a part to become available
pub read_timeout: Duration,
/// The size of the parts that the prefetcher is trying to align with
pub part_alignment: usize,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we retrieve this directly from the client

}

impl Default for PrefetcherConfig {
Expand All @@ -65,7 +61,6 @@ impl Default for PrefetcherConfig {
max_request_size: 2 * 1024 * 1024 * 1024,
sequential_prefetch_multiplier: 8,
read_timeout: Duration::from_secs(60),
part_alignment: 8 * 1024 * 1024,
}
}
}
Expand All @@ -76,22 +71,28 @@ pub struct Prefetcher<Client, Runtime> {
inner: Arc<PrefetcherInner<Client, Runtime>>,
}

#[derive(Debug)]
struct PrefetcherInner<Client, Runtime> {
client: Arc<Client>,
part_feed: Arc<dyn ObjectPartFeed<Client> + Send + Sync>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any thoughts on whether this should be a generic type rather than dynamic dispatch? I guess we can get away with it here (unlike Client) because ObjectPartFeed is object-safe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a pragmatic choice which we can revert later on. The downside of a new generic type here is that it would propagate up to the root S3FuseFilesystem and end up touching most of the code in this crate.
In fact, I had started working on an alternative option where I replaced ObjectClient with a new ObjectStore trait, with get_object_parts instead of get_object (and allowing for further divergence in the future), but that was a much larger change and would have slowed down work on the caching layer.
I still think something like that would be a preferable solution in the long term, but we can review the approach while we work on the cache or soon after.

config: PrefetcherConfig,
runtime: Runtime,
}

impl<Client, Runtime> Debug for PrefetcherInner<Client, Runtime> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

derive(Debug) did not seem to work with part_feed: Arc<dyn ObjectPartFeed<Client>...>. Not completely sure why (wrt client: Arc<Client>) or whether there is a better solution.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because the Debug impl for part_feed: Arc<dyn ObjectPartFeed<Client>...> has to work for any ObjectPartFeed (because the concrete type isn't known), which can only be possible if the ObjectPartFeed trait itself guarantees Debug. On the other hand, the Arc<Client> version only has to work for the concrete Client type(s) that we instantiate it with, which all happen to implement Debug.

fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PrefetcherInner").field("config", &self.config).finish()
}
}

impl<Client, Runtime> Prefetcher<Client, Runtime>
where
Client: ObjectClient + Send + Sync + 'static,
Runtime: Spawn,
{
/// Create a new [Prefetcher] that will make requests to the given client.
pub fn new(client: Arc<Client>, runtime: Runtime, config: PrefetcherConfig) -> Self {
let part_feed = Arc::new(ClientPartFeed::new(client));
let inner = PrefetcherInner {
client,
part_feed,
config,
runtime,
};
Expand All @@ -101,7 +102,7 @@ where

/// Start a new get request to the specified object.
pub fn get(&self, bucket: &str, key: &str, size: u64, etag: ETag) -> PrefetchGetObject<Client, Runtime> {
PrefetchGetObject::new(Arc::clone(&self.inner), bucket, key, size, etag)
PrefetchGetObject::new(self.inner.clone(), bucket, key, size, etag)
}
}

Expand Down Expand Up @@ -276,55 +277,16 @@ where
trace!(?range, size, "spawning request");

let request_task = {
let client = Arc::clone(&self.inner.client);
let feed = self.inner.part_feed.clone();
let preferred_part_size = self.preferred_part_size;
let bucket = self.bucket.to_owned();
let key = self.key.to_owned();
let etag = self.etag.clone();
let span = debug_span!("prefetch", range=?range);

async move {
match client.get_object(&bucket, &key, Some(range.clone()), Some(etag)).await {
Err(e) => {
error!(error=?e, "RequestTask get object failed");
part_queue_producer.push(Err(e));
}
Ok(request) => {
pin_mut!(request);
loop {
match request.next().await {
Some(Ok((offset, body))) => {
// pre-split the body into multiple parts as suggested by preferred part size
// in order to avoid validating checksum on large parts at read.
assert!(preferred_part_size > 0);
let mut body: Bytes = body.into();
let mut curr_offset = offset;
loop {
let chunk_size = preferred_part_size.min(body.len());
if chunk_size == 0 {
break;
}
let chunk = body.split_to(chunk_size);
// S3 doesn't provide checksum for us if the request range is not aligned to object part boundaries,
// so we're computing our own checksum here.
let checksum = crc32c::checksum(&chunk);
let checksum_bytes = ChecksummedBytes::new(chunk, checksum);
let part = Part::new(&key, curr_offset, checksum_bytes);
curr_offset += part.len() as u64;
part_queue_producer.push(Ok(part));
}
}
Some(Err(e)) => {
error!(error=?e, "RequestTask body part failed");
part_queue_producer.push(Err(e));
break;
}
None => break,
}
}
trace!("request finished");
}
}
feed.get_object_parts(&bucket, &key, range, etag, preferred_part_size, part_queue_producer)
.await
}
.instrument(span)
};
Expand All @@ -344,29 +306,14 @@ where
}

/// Suggest next request size.
/// Normally, next request size is current request size multiply by sequential prefetch multiplier,
/// but if the request size is getting bigger than a part size we will try to align it to part boundaries.
/// The next request size is the current request size multiplied by sequential prefetch multiplier.
fn get_next_request_size(&self) -> usize {
// calculate next request size
let next_request_size = (self.next_request_size * self.inner.config.sequential_prefetch_multiplier)
.min(self.inner.config.max_request_size);

let offset_in_part = (self.next_request_offset % self.inner.config.part_alignment as u64) as usize;
// if the offset is not at the start of the part we will drain all the bytes from that part first
if offset_in_part != 0 {
let remaining_in_part = self.inner.config.part_alignment - offset_in_part;
next_request_size.min(remaining_in_part)
} else {
// if the next request size is smaller than the part size, just return that value
if next_request_size < self.inner.config.part_alignment {
return next_request_size;
}

// if it exceeds part boundaries, trim it to the part boundaries
let next_request_boundary = self.next_request_offset + next_request_size as u64;
let remainder = (next_request_boundary % self.inner.config.part_alignment as u64) as usize;
next_request_size - remainder
}
self.inner
.part_feed
.get_aligned_request_size(self.next_request_offset, next_request_size)
}

/// Reset this prefetch request to a new offset, clearing any existing tasks queued.
Expand Down Expand Up @@ -457,7 +404,6 @@ mod tests {
max_request_size: test_config.max_request_size,
sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier,
read_timeout: Duration::from_secs(5),
part_alignment: test_config.client_part_size,
};
let runtime = ThreadPool::builder().pool_size(1).create().unwrap();
let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config);
Expand Down Expand Up @@ -608,7 +554,6 @@ mod tests {
sequential_prefetch_multiplier: prefetch_multiplier,
max_request_size,
read_timeout: Duration::from_secs(60),
part_alignment: part_size,
};
let runtime = ThreadPool::builder().pool_size(1).create().unwrap();
let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config);
Expand Down
127 changes: 127 additions & 0 deletions mountpoint-s3/src/prefetch/feed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use std::{fmt::Debug, ops::Range, sync::Arc};

use async_trait::async_trait;
use bytes::Bytes;
use futures::{pin_mut, StreamExt};
use mountpoint_s3_client::{
error::{GetObjectError, ObjectClientError},
types::ETag,
ObjectClient,
};
use mountpoint_s3_crt::checksums::crc32c;
use tracing::{error, trace};

use crate::prefetch::{checksummed_bytes::ChecksummedBytes, part::Part, part_queue::PartQueueProducer};

/// A generic interface to retrieve data from objects in a S3-like store.
#[async_trait]
pub trait ObjectPartFeed<Client: ObjectClient> {
/// Get the content of an object in fixed size parts. The parts are pushed to the provided `part_sink`
/// and are guaranteed to be contiguous and in the correct order. Callers need to specify a preferred
/// size for the parts, but implementations are allowed to ignore it.
async fn get_object_parts(
&self,
bucket: &str,
key: &str,
range: Range<u64>,
if_match: ETag,
preferred_part_size: usize,
part_sink: PartQueueProducer<ObjectClientError<GetObjectError, Client::ClientError>>,
);

/// Adjust the size of a request to align to optimal part boundaries for this client.
fn get_aligned_request_size(&self, offset: u64, preferred_size: usize) -> usize;
}

/// [ObjectPartFeed] implementation which delegates retrieving object data to a [Client].
#[derive(Debug)]
pub struct ClientPartFeed<Client> {
client: Arc<Client>,
}
Comment on lines +36 to +40
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do we plan to put the cache-based implementation of this? In this file also?

I'm wondering if there's a good way to break down this file.


impl<Client> ClientPartFeed<Client> {
pub fn new(client: Arc<Client>) -> Self {
Self { client }
}
}

#[async_trait]
impl<Client> ObjectPartFeed<Client> for ClientPartFeed<Client>
where
Client: ObjectClient + Send + Sync + 'static,
{
async fn get_object_parts(
&self,
bucket: &str,
key: &str,
range: Range<u64>,
if_match: ETag,
preferred_part_size: usize,
part_queue_producer: PartQueueProducer<ObjectClientError<GetObjectError, Client::ClientError>>,
) {
assert!(preferred_part_size > 0);
let get_object_result = match self.client.get_object(bucket, key, Some(range), Some(if_match)).await {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is moved from PrefetchGetObject::spawn_next_request

Ok(get_object_result) => get_object_result,
Err(e) => {
error!(error=?e, "GetObject request failed");
part_queue_producer.push(Err(e));
return;
}
};

pin_mut!(get_object_result);
loop {
match get_object_result.next().await {
Some(Ok((offset, body))) => {
trace!(offset, length = body.len(), "received GetObject part");
// pre-split the body into multiple parts as suggested by preferred part size
// in order to avoid validating checksum on large parts at read.
let mut body: Bytes = body.into();
let mut curr_offset = offset;
loop {
let chunk_size = preferred_part_size.min(body.len());
if chunk_size == 0 {
break;
}
let chunk = body.split_to(chunk_size);
// S3 doesn't provide checksum for us if the request range is not aligned to
// object part boundaries, so we're computing our own checksum here.
let checksum = crc32c::checksum(&chunk);
let checksum_bytes = ChecksummedBytes::new(chunk, checksum);
let part = Part::new(key, curr_offset, checksum_bytes);
curr_offset += part.len() as u64;
part_queue_producer.push(Ok(part));
}
}
Some(Err(e)) => {
error!(error=?e, "GetObject body part failed");
part_queue_producer.push(Err(e));
break;
}
None => break,
}
}
trace!("request finished");
}

fn get_aligned_request_size(&self, offset: u64, preferred_length: usize) -> usize {
// If the request size is bigger than a part size we will try to align it to part boundaries.
let part_alignment = self.client.part_size().unwrap_or(8 * 1024 * 1024);
let offset_in_part = (offset % part_alignment as u64) as usize;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved from PrefetchGetObject::get_next_request_size

if offset_in_part != 0 {
// if the offset is not at the start of the part we will drain all the bytes from that part first
let remaining_in_part = part_alignment - offset_in_part;
preferred_length.min(remaining_in_part)
} else {
// if the request size is smaller than the part size, just return that value
if preferred_length < part_alignment {
preferred_length
} else {
// if it exceeds part boundaries, trim it to the part boundaries
let request_boundary = offset + preferred_length as u64;
let remainder = (request_boundary % part_alignment as u64) as usize;
preferred_length - remainder
}
}
}
}
Loading