diff --git a/Cargo.lock b/Cargo.lock index 48c297458..9d031daa4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1213,6 +1213,16 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs2" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "funty" version = "2.0.0" @@ -1768,6 +1778,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "linked-hash-map" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" + [[package]] name = "linux-raw-sys" version = "0.1.4" @@ -1916,12 +1932,14 @@ dependencies = [ "ctrlc", "dashmap", "filetime", + "fs2", "fuser", "futures", "hdrhistogram", "hex", "lazy_static", "libc", + "linked-hash-map", "metrics", "mountpoint-s3-client", "mountpoint-s3-crt", diff --git a/mountpoint-s3/Cargo.toml b/mountpoint-s3/Cargo.toml index 1d7ea3d20..56c1649b2 100644 --- a/mountpoint-s3/Cargo.toml +++ b/mountpoint-s3/Cargo.toml @@ -40,6 +40,8 @@ serde = { version = "1.0.190", features = ["derive"] } bincode = "1.3.3" sha2 = "0.10.6" hex = "0.4.3" +linked-hash-map = "0.5.6" +fs2 = "0.4.3" [target.'cfg(target_os = "linux")'.dependencies] procfs = { version = "0.15.1", default-features = false } diff --git a/mountpoint-s3/src/data_cache.rs b/mountpoint-s3/src/data_cache.rs index e8dd730ce..52f449b20 100644 --- a/mountpoint-s3/src/data_cache.rs +++ b/mountpoint-s3/src/data_cache.rs @@ -11,7 +11,7 @@ use mountpoint_s3_client::types::ETag; use thiserror::Error; pub use crate::checksums::ChecksummedBytes; -pub use crate::data_cache::disk_data_cache::DiskDataCache; +pub use crate::data_cache::disk_data_cache::{CacheLimit, DiskDataCache}; pub use crate::data_cache::in_memory_data_cache::InMemoryDataCache; /// Struct representing a key for accessing an entry in a [DataCache]. diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index c69c82957..2805e5f98 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -1,18 +1,20 @@ //! Module for the on-disk data cache implementation. use std::fs; -use std::io::{ErrorKind, Read, Write}; -use std::path::PathBuf; +use std::io::{ErrorKind, Read, Seek, Write}; +use std::path::{Path, PathBuf}; use bytes::Bytes; +use linked_hash_map::LinkedHashMap; use mountpoint_s3_crt::checksums::crc32c::{self, Crc32c}; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; use thiserror::Error; -use tracing::{error, trace}; +use tracing::{error, trace, warn}; use crate::checksums::IntegrityError; use crate::data_cache::DataCacheError; +use crate::sync::Mutex; use super::{BlockIndex, CacheKey, ChecksummedBytes, DataCache, DataCacheResult}; @@ -26,6 +28,16 @@ const HASHED_DIR_SPLIT_INDEX: usize = 2; pub struct DiskDataCache { block_size: u64, cache_directory: PathBuf, + limit: CacheLimit, + usage: Option>>, +} + +/// Limit the cache size. +#[derive(Debug)] +pub enum CacheLimit { + Unbounded, + TotalSize { max_size: usize }, + AvailableSpace { min_ratio: f64 }, } /// Describes additional information about the data stored in the block. @@ -148,54 +160,33 @@ impl DiskBlock { impl DiskDataCache { /// Create a new instance of an [DiskDataCache] with the specified `block_size`. - pub fn new(cache_directory: PathBuf, block_size: u64) -> Self { + pub fn new(cache_directory: PathBuf, block_size: u64, limit: CacheLimit) -> Self { + let usage = match limit { + CacheLimit::Unbounded => None, + CacheLimit::TotalSize { .. } | CacheLimit::AvailableSpace { .. } => Some(Mutex::new(UsageInfo::new())), + }; DiskDataCache { block_size, cache_directory, + limit, + usage, } } /// Get the relative path for the given block. - fn get_path_for_key(&self, cache_key: &CacheKey) -> PathBuf { + fn get_path_for_block_key(&self, block_key: &DiskBlockKey) -> PathBuf { let mut path = self.cache_directory.join(CACHE_VERSION); - - // An S3 key may be up to 1024 UTF-8 bytes long, which exceeds the maximum UNIX file name length. - // Instead, the path contains a hash of the S3 key and ETag. - // The risk of collisions is mitigated as we ignore blocks read that contain the wrong S3 key, etc.. - let hashed_cache_key = hash_cache_key(cache_key); - - // Split directories by taking the first few chars of hash to avoid hitting any FS-specific maximum number of directory entries. - let (first, second) = hashed_cache_key.split_at(HASHED_DIR_SPLIT_INDEX); - path.push(first); - path.push(second); - + block_key.append_to_path(&mut path); path } - /// Get path for the given block. - fn get_path_for_block(&self, cache_key: &CacheKey, block_idx: BlockIndex) -> PathBuf { - let mut path = self.get_path_for_key(cache_key); - path.push(format!("{}.block", block_idx)); - path - } -} - -/// Hash the cache key using its fields as well as the [CACHE_VERSION], -/// returning the hash encoded as hexadecimal in a new [String]. -fn hash_cache_key(cache_key: &CacheKey) -> String { - let CacheKey { s3_key, etag } = cache_key; - - let mut hasher = Sha256::new(); - hasher.update(CACHE_VERSION.as_bytes()); - hasher.update(s3_key.as_bytes()); - hasher.update(etag.as_str().as_bytes()); - hex::encode(hasher.finalize()) -} - -impl DataCache for DiskDataCache { - fn get_block(&self, cache_key: &CacheKey, block_idx: BlockIndex) -> DataCacheResult> { - let path = self.get_path_for_block(cache_key, block_idx); - let mut file = match fs::File::open(&path) { + fn read_block( + &self, + path: impl AsRef, + cache_key: &CacheKey, + block_idx: BlockIndex, + ) -> DataCacheResult> { + let mut file = match fs::File::open(path.as_ref()) { Ok(file) => file, Err(err) if err.kind() == ErrorKind::NotFound => return Ok(None), Err(err) => return Err(err.into()), @@ -205,7 +196,7 @@ impl DataCache for DiskDataCache { file.read_exact(&mut block_version)?; if block_version != CACHE_VERSION.as_bytes() { error!( - found_version = ?block_version, expected_version = ?CACHE_VERSION, ?path, + found_version = ?block_version, expected_version = ?CACHE_VERSION, path = ?path.as_ref(), "stale block format found during reading" ); return Err(DataCacheError::InvalidBlockContent); @@ -227,17 +218,14 @@ impl DataCache for DiskDataCache { Ok(Some(bytes)) } - fn put_block(&self, cache_key: CacheKey, block_idx: BlockIndex, bytes: ChecksummedBytes) -> DataCacheResult<()> { - let path = self.get_path_for_block(&cache_key, block_idx); - trace!(?cache_key, ?path, "new block will be created in disk cache"); - let cache_path_for_key = path.parent().expect("path should include cache key in directory name"); + fn write_block(&self, path: impl AsRef, block: DiskBlock) -> DataCacheResult { + let cache_path_for_key = path + .as_ref() + .parent() + .expect("path should include cache key in directory name"); fs::create_dir_all(cache_path_for_key)?; - let block = DiskBlock::new(cache_key, block_idx, bytes).map_err(|err| match err { - DiskBlockCreationError::IntegrityError(_e) => DataCacheError::InvalidBlockContent, - })?; - - let mut file = fs::File::create(path)?; + let mut file = fs::File::create(path.as_ref())?; file.write_all(CACHE_VERSION.as_bytes())?; let serialize_result = bincode::serialize_into(&mut file, &block); if let Err(err) = serialize_result { @@ -246,6 +234,97 @@ impl DataCache for DiskDataCache { _ => Err(DataCacheError::InvalidBlockContent), }; }; + Ok(file.stream_position()? as usize) + } + + fn is_limit_exceeded(&self, size: usize) -> bool { + match self.limit { + CacheLimit::Unbounded => false, + CacheLimit::TotalSize { max_size } => size > max_size, + CacheLimit::AvailableSpace { min_ratio } => { + let stats = match fs2::statvfs(&self.cache_directory) { + Ok(stats) => stats, + Err(error) => { + warn!(?error, "unable to determine available space"); + return false; + } + }; + (stats.available_space() as f64) < min_ratio * (stats.total_space() as f64) + } + } + } + + fn evict_if_needed(&self) { + let Some(usage) = &self.usage else { + return; + }; + + while self.is_limit_exceeded(usage.lock().unwrap().size) { + let Some(to_remove) = usage.lock().unwrap().evict_lru() else { + error!("cache limit exceeded but nothing to evict"); + break; + }; + let path_to_remove = self.get_path_for_block_key(&to_remove); + if let Err(remove_err) = fs::remove_file(&path_to_remove) { + error!("unable to remove invalid block: {:?}", remove_err); + } + } + } +} + +/// Hash the cache key using its fields as well as the [CACHE_VERSION]. +fn hash_cache_key_raw(cache_key: &CacheKey) -> [u8; 32] { + let CacheKey { s3_key, etag } = cache_key; + + let mut hasher = Sha256::new(); + hasher.update(CACHE_VERSION.as_bytes()); + hasher.update(s3_key); + hasher.update(etag.as_str()); + hasher.finalize().into() +} + +impl DataCache for DiskDataCache { + fn get_block(&self, cache_key: &CacheKey, block_idx: BlockIndex) -> DataCacheResult> { + let block_key = DiskBlockKey::new(cache_key, block_idx); + let path = self.get_path_for_block_key(&block_key); + match self.read_block(&path, cache_key, block_idx) { + Ok(None) => Ok(None), + Ok(Some(bytes)) => { + if let Some(usage) = &self.usage { + usage.lock().unwrap().refresh(&block_key); + } + Ok(Some(bytes)) + } + Err(err) => { + match fs::remove_file(&path) { + Ok(()) => { + if let Some(usage) = &self.usage { + usage.lock().unwrap().remove(&block_key); + } + } + Err(remove_err) => error!("unable to remove invalid block: {:?}", remove_err), + } + Err(err) + } + } + } + + fn put_block(&self, cache_key: CacheKey, block_idx: BlockIndex, bytes: ChecksummedBytes) -> DataCacheResult<()> { + let block_key = DiskBlockKey::new(&cache_key, block_idx); + let path = self.get_path_for_block_key(&block_key); + trace!(?cache_key, ?path, "new block will be created in disk cache"); + + let block = DiskBlock::new(cache_key, block_idx, bytes).map_err(|err| match err { + DiskBlockCreationError::IntegrityError(_e) => DataCacheError::InvalidBlockContent, + })?; + + self.evict_if_needed(); + + let size = self.write_block(path, block)?; + if let Some(usage) = &self.usage { + usage.lock().unwrap().add(block_key, size); + } + Ok(()) } @@ -254,6 +333,86 @@ impl DataCache for DiskDataCache { } } +/// Key to identify a block in the disk cache, composed of a hash of the S3 key and Etag, and the block index. +/// An S3 key may be up to 1024 UTF-8 bytes long, which exceeds the maximum UNIX file name length. +/// Instead, the path contains a hash of the S3 key and ETag. +/// The risk of collisions is mitigated as we ignore blocks read that contain the wrong S3 key, etc.. +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +struct DiskBlockKey { + hashed_key: [u8; 32], + block_index: BlockIndex, +} + +impl DiskBlockKey { + fn new(cache_key: &CacheKey, block_index: BlockIndex) -> Self { + let hashed_key = hash_cache_key_raw(cache_key); + Self { + hashed_key, + block_index, + } + } + + fn hex_key(&self) -> String { + hex::encode(self.hashed_key) + } + + fn append_to_path(&self, path: &mut PathBuf) { + let hashed_cache_key = self.hex_key(); + + // Split directories by taking the first few chars of hash to avoid hitting any FS-specific maximum number of directory entries. + let (first, second) = hashed_cache_key.split_at(HASHED_DIR_SPLIT_INDEX); + path.push(first); + path.push(second); + + // Append the block index. + path.push(format!("{}.block", self.block_index)); + } +} + +/// Keeps track of entries usage and total size. +struct UsageInfo { + entries: LinkedHashMap, + size: usize, +} + +impl UsageInfo +where + K: std::hash::Hash + Eq + std::fmt::Debug, +{ + fn new() -> Self { + Self { + entries: LinkedHashMap::new(), + size: 0, + } + } + + fn refresh(&mut self, key: &K) -> bool { + self.entries.get_refresh(key).is_some() + } + + fn add(&mut self, key: K, size: usize) { + if let Some(previous_size) = self.entries.insert(key, size) { + self.size = self.size.saturating_sub(previous_size); + } + + self.size = self.size.saturating_add(size); + } + + fn remove(&mut self, key: &K) { + if let Some(size) = self.entries.remove(key) { + self.size = self.size.saturating_sub(size); + } + } + + fn evict_lru(&mut self) -> Option { + let Some((key, size)) = self.entries.pop_front() else { + return None; + }; + self.size = self.size.saturating_sub(size); + Some(key) + } +} + #[cfg(test)] mod tests { use std::ffi::OsString; @@ -262,6 +421,8 @@ mod tests { use super::*; use mountpoint_s3_client::types::ETag; + use rand::{Rng, SeedableRng}; + use rand_chacha::ChaCha20Rng; #[test] fn test_block_format_version_requires_update() { @@ -279,12 +440,12 @@ mod tests { let serialized_bytes = bincode::serialize(&block).unwrap(); assert_eq!( expected_bytes, serialized_bytes, - "serialzed disk format appears to have changed, version bump required" + "serialized disk format appears to have changed, version bump required" ); } #[test] - fn test_hash_cache_key() { + fn test_hash_cache_key_raw() { let s3_key = "a".repeat(266); let etag = ETag::for_tests(); let key = CacheKey { @@ -292,38 +453,14 @@ mod tests { s3_key: s3_key.to_owned(), }; let expected_hash = "b717d5a78ed63238b0778e7295d83e963758aa54db6e969a822f2b13ce9a3067"; - assert_eq!(expected_hash, hash_cache_key(&key)); - } - - #[test] - fn test_get_path_for_key() { - let cache_dir = PathBuf::from("mountpoint-cache/"); - let data_cache = DiskDataCache::new(cache_dir, 1024); - - let s3_key = "a".repeat(266); - - let etag = ETag::for_tests(); - let key = CacheKey { - etag, - s3_key: s3_key.to_owned(), - }; - let hashed_cache_key = hash_cache_key(&key); - let split_hashed_key = hashed_cache_key.split_at(HASHED_DIR_SPLIT_INDEX); - let expected = vec![ - "mountpoint-cache", - CACHE_VERSION, - split_hashed_key.0, - split_hashed_key.1, - ]; - let path = data_cache.get_path_for_key(&key); - let results: Vec = path.iter().map(ToOwned::to_owned).collect(); - assert_eq!(expected, results); + let actual_hash = hex::encode(hash_cache_key_raw(&key)); + assert_eq!(expected_hash, actual_hash); } #[test] - fn test_get_path_for_block() { + fn get_path_for_block_key() { let cache_dir = PathBuf::from("mountpoint-cache/"); - let data_cache = DiskDataCache::new(cache_dir, 1024); + let data_cache = DiskDataCache::new(cache_dir, 1024, CacheLimit::Unbounded); let s3_key = "a".repeat(266); @@ -332,7 +469,8 @@ mod tests { etag, s3_key: s3_key.to_owned(), }; - let hashed_cache_key = hash_cache_key(&key); + let block_key = DiskBlockKey::new(&key, 5); + let hashed_cache_key = hex::encode(hash_cache_key_raw(&key)); let split_hashed_key = hashed_cache_key.split_at(HASHED_DIR_SPLIT_INDEX); let expected = vec![ "mountpoint-cache", @@ -341,7 +479,7 @@ mod tests { split_hashed_key.1, "5.block", ]; - let path = data_cache.get_path_for_block(&key, 5); + let path = data_cache.get_path_for_block_key(&block_key); let results: Vec = path.iter().map(ToOwned::to_owned).collect(); assert_eq!(expected, results); } @@ -353,7 +491,7 @@ mod tests { let data_3 = ChecksummedBytes::from_bytes("Baz".into()); let cache_directory = tempfile::tempdir().unwrap(); - let cache = DiskDataCache::new(cache_directory.into_path(), 8 * 1024 * 1024); + let cache = DiskDataCache::new(cache_directory.into_path(), 8 * 1024 * 1024, CacheLimit::Unbounded); let cache_key_1 = CacheKey { s3_key: "a".into(), etag: ETag::for_tests(), @@ -426,7 +564,7 @@ mod tests { let slice = data.slice(1..5); let cache_directory = tempfile::tempdir().unwrap(); - let cache = DiskDataCache::new(cache_directory.into_path(), 8 * 1024 * 1024); + let cache = DiskDataCache::new(cache_directory.into_path(), 8 * 1024 * 1024, CacheLimit::Unbounded); let cache_key = CacheKey { s3_key: "a".into(), etag: ETag::for_tests(), @@ -446,6 +584,107 @@ mod tests { ); } + #[test] + fn test_eviction() { + fn create_random(seed: u64, size: usize) -> ChecksummedBytes { + let mut rng = ChaCha20Rng::seed_from_u64(seed + size as u64); + let mut body = vec![0u8; size]; + rng.fill(&mut body[..]); + + ChecksummedBytes::from_bytes(body.into()) + } + + fn is_block_in_cache( + cache: &DiskDataCache, + cache_key: &CacheKey, + block_idx: u64, + expected_bytes: &ChecksummedBytes, + ) -> bool { + if let Some(retrieved) = cache + .get_block(cache_key, block_idx) + .expect("cache should be accessible") + { + assert_eq!( + retrieved.clone().into_bytes().expect("retrieved bytes should be valid"), + expected_bytes + .clone() + .into_bytes() + .expect("original bytes should be valid") + ); + true + } else { + false + } + } + + const BLOCK_SIZE: usize = 100 * 1024; + const LARGE_OBJECT_SIZE: usize = 1024 * 1024; + const SMALL_OBJECT_SIZE: usize = LARGE_OBJECT_SIZE / 2; + const CACHE_LIMIT: usize = LARGE_OBJECT_SIZE; + + let large_object = create_random(0x12345678, LARGE_OBJECT_SIZE); + let large_object_blocks: Vec<_> = (0..large_object.len()) + .step_by(BLOCK_SIZE) + .map(|offset| large_object.slice(offset..(large_object.len().min(offset + BLOCK_SIZE)))) + .collect(); + let large_object_key = CacheKey { + s3_key: "large".into(), + etag: ETag::for_tests(), + }; + + let small_object = create_random(0x23456789, SMALL_OBJECT_SIZE); + let small_object_blocks: Vec<_> = (0..small_object.len()) + .step_by(BLOCK_SIZE) + .map(|offset| small_object.slice(offset..(small_object.len().min(offset + BLOCK_SIZE)))) + .collect(); + let small_object_key = CacheKey { + s3_key: "small".into(), + etag: ETag::for_tests(), + }; + + let cache_directory = tempfile::tempdir().unwrap(); + let cache = DiskDataCache::new( + cache_directory.into_path(), + BLOCK_SIZE as u64, + CacheLimit::TotalSize { max_size: CACHE_LIMIT }, + ); + + // Put all of large_object + for (block_idx, bytes) in large_object_blocks.iter().enumerate() { + cache + .put_block(large_object_key.clone(), block_idx as u64, bytes.clone()) + .unwrap(); + } + + // Put all of small_object + for (block_idx, bytes) in small_object_blocks.iter().enumerate() { + cache + .put_block(small_object_key.clone(), block_idx as u64, bytes.clone()) + .unwrap(); + } + + let count_small_object_blocks_in_cache = small_object_blocks + .iter() + .enumerate() + .filter(|&(block_idx, bytes)| is_block_in_cache(&cache, &small_object_key, block_idx as u64, bytes)) + .count(); + assert_eq!( + count_small_object_blocks_in_cache, + small_object_blocks.len(), + "All blocks for small object should still be in the cache" + ); + + let count_large_object_blocks_in_cache = large_object_blocks + .iter() + .enumerate() + .filter(|&(block_idx, bytes)| is_block_in_cache(&cache, &large_object_key, block_idx as u64, bytes)) + .count(); + assert!( + count_large_object_blocks_in_cache < large_object_blocks.len(), + "Some blocks for the large object should have been evicted" + ); + } + #[test] fn data_block_extract_checks() { let data_1 = ChecksummedBytes::from_bytes("Foo".into()); diff --git a/mountpoint-s3/src/main.rs b/mountpoint-s3/src/main.rs index a652c8d8d..8f86a1459 100644 --- a/mountpoint-s3/src/main.rs +++ b/mountpoint-s3/src/main.rs @@ -261,6 +261,30 @@ struct CliArgs { )] pub data_cache_block_size: u64, + // TODO: Temporary for testing. Review before exposing outside "caching" feature. + #[cfg(feature = "caching")] + #[clap( + long, + help = "Maximum size for the data cache in MB", + value_parser = value_parser!(u64).range(1..), + help_heading = CACHING_OPTIONS_HEADER, + requires = "data_caching_directory", + conflicts_with = "data_cache_free_space", + )] + pub data_cache_size_limit: Option, + + // TODO: Temporary for testing. Review before exposing outside "caching" feature. + #[cfg(feature = "caching")] + #[clap( + long, + help = "Minimum available space to maintain (%)", + value_parser = value_parser!(u64).range(0..100), + help_heading = CACHING_OPTIONS_HEADER, + requires = "data_caching_directory", + conflicts_with = "data_cache_size_limit", + )] + pub data_cache_free_space: Option, + #[clap( long, help = "Configure a string to be prepended to the 'User-Agent' HTTP request header for all S3 requests", @@ -559,7 +583,7 @@ fn mount(args: CliArgs) -> anyhow::Result { #[cfg(feature = "caching")] { - use mountpoint_s3::data_cache::DiskDataCache; + use mountpoint_s3::data_cache::{CacheLimit, DiskDataCache}; use mountpoint_s3::fs::CacheConfig; use mountpoint_s3::prefetch::caching_prefetch; @@ -574,7 +598,19 @@ fn mount(args: CliArgs) -> anyhow::Result { } if let Some(path) = args.data_caching_directory { - let cache = DiskDataCache::new(path, args.data_cache_block_size); + let limit = { + if let Some(max_size_in_mb) = args.data_cache_size_limit { + let max_size = (max_size_in_mb * 1024 * 1024) as usize; + CacheLimit::TotalSize { max_size } + } else if let Some(per_cent) = args.data_cache_free_space { + let min_ratio = (per_cent as f64) * 0.01; + CacheLimit::AvailableSpace { min_ratio } + } else { + CacheLimit::Unbounded + } + }; + + let cache = DiskDataCache::new(path, args.data_cache_block_size, limit); let prefetcher = caching_prefetch(cache, runtime, prefetcher_config); return create_filesystem( client,