diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index ff0a46533dda..b79042e3cda8 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -698,12 +698,28 @@ pub struct GetOptions { /// Request will succeed if the `ObjectMeta::e_tag` matches /// otherwise returning [`Error::Precondition`] /// - /// + /// See + /// + /// Examples: + /// + /// ```text + /// If-Match: "xyzzy" + /// If-Match: "xyzzy", "r2d2xxxx", "c3piozzzz" + /// If-Match: * + /// ``` pub if_match: Option, /// Request will succeed if the `ObjectMeta::e_tag` does not match /// otherwise returning [`Error::NotModified`] /// - /// + /// See + /// + /// Examples: + /// + /// ```text + /// If-None-Match: "xyzzy" + /// If-None-Match: "xyzzy", "r2d2xxxx", "c3piozzzz" + /// If-None-Match: * + /// ``` pub if_none_match: Option, /// Request will succeed if the object has been modified since /// @@ -730,25 +746,41 @@ pub struct GetOptions { impl GetOptions { /// Returns an error if the modification conditions on this request are not satisfied - fn check_modified( - &self, - location: &Path, - last_modified: DateTime, - ) -> Result<()> { - if let Some(date) = self.if_modified_since { - if last_modified <= date { - return Err(Error::NotModified { - path: location.to_string(), - source: format!("{} >= {}", date, last_modified).into(), + /// + /// + fn check_preconditions(&self, meta: &ObjectMeta) -> Result<()> { + // The use of the invalid etag "*" means no ETag is equivalent to never matching + let etag = meta.e_tag.as_deref().unwrap_or("*"); + let last_modified = meta.last_modified; + + if let Some(m) = &self.if_match { + if m != "*" && m.split(',').map(str::trim).all(|x| x != etag) { + return Err(Error::Precondition { + path: meta.location.to_string(), + source: format!("{etag} does not match {m}").into(), }); } - } - - if let Some(date) = self.if_unmodified_since { + } else if let Some(date) = self.if_unmodified_since { if last_modified > date { return Err(Error::Precondition { - path: location.to_string(), - source: format!("{} < {}", date, last_modified).into(), + path: meta.location.to_string(), + source: format!("{date} < {last_modified}").into(), + }); + } + } + + if let Some(m) = &self.if_none_match { + if m == "*" || m.split(',').map(str::trim).any(|x| x == etag) { + return Err(Error::NotModified { + path: meta.location.to_string(), + source: format!("{etag} matches {m}").into(), + }); + } + } else if let Some(date) = self.if_modified_since { + if last_modified <= date { + return Err(Error::NotModified { + path: meta.location.to_string(), + source: format!("{date} >= {last_modified}").into(), }); } } @@ -952,6 +984,7 @@ mod test_util { mod tests { use super::*; use crate::test_util::flatten_list_stream; + use chrono::TimeZone; use rand::{thread_rng, Rng}; use tokio::io::AsyncWriteExt; @@ -1359,33 +1392,32 @@ mod tests { Err(e) => panic!("{e}"), } - if let Some(tag) = meta.e_tag { - let options = GetOptions { - if_match: Some(tag.clone()), - ..GetOptions::default() - }; - storage.get_opts(&path, options).await.unwrap(); - - let options = GetOptions { - if_match: Some("invalid".to_string()), - ..GetOptions::default() - }; - let err = storage.get_opts(&path, options).await.unwrap_err(); - assert!(matches!(err, Error::Precondition { .. }), "{err}"); - - let options = GetOptions { - if_none_match: Some(tag.clone()), - ..GetOptions::default() - }; - let err = storage.get_opts(&path, options).await.unwrap_err(); - assert!(matches!(err, Error::NotModified { .. }), "{err}"); - - let options = GetOptions { - if_none_match: Some("invalid".to_string()), - ..GetOptions::default() - }; - storage.get_opts(&path, options).await.unwrap(); - } + let tag = meta.e_tag.unwrap(); + let options = GetOptions { + if_match: Some(tag.clone()), + ..GetOptions::default() + }; + storage.get_opts(&path, options).await.unwrap(); + + let options = GetOptions { + if_match: Some("invalid".to_string()), + ..GetOptions::default() + }; + let err = storage.get_opts(&path, options).await.unwrap_err(); + assert!(matches!(err, Error::Precondition { .. }), "{err}"); + + let options = GetOptions { + if_none_match: Some(tag.clone()), + ..GetOptions::default() + }; + let err = storage.get_opts(&path, options).await.unwrap_err(); + assert!(matches!(err, Error::NotModified { .. }), "{err}"); + + let options = GetOptions { + if_none_match: Some("invalid".to_string()), + ..GetOptions::default() + }; + storage.get_opts(&path, options).await.unwrap(); } /// Returns a chunk of length `chunk_length` @@ -1697,8 +1729,86 @@ mod tests { assert!(stream.next().await.is_none()); } - // Tests TODO: - // GET nonexisting location (in_memory/file) - // DELETE nonexisting location - // PUT overwriting + #[test] + fn test_preconditions() { + let mut meta = ObjectMeta { + location: Path::from("test"), + last_modified: Utc.timestamp_nanos(100), + size: 100, + e_tag: Some("123".to_string()), + }; + + let mut options = GetOptions::default(); + options.check_preconditions(&meta).unwrap(); + + options.if_modified_since = Some(Utc.timestamp_nanos(50)); + options.check_preconditions(&meta).unwrap(); + + options.if_modified_since = Some(Utc.timestamp_nanos(100)); + options.check_preconditions(&meta).unwrap_err(); + + options.if_modified_since = Some(Utc.timestamp_nanos(101)); + options.check_preconditions(&meta).unwrap_err(); + + options = GetOptions::default(); + + options.if_unmodified_since = Some(Utc.timestamp_nanos(50)); + options.check_preconditions(&meta).unwrap_err(); + + options.if_unmodified_since = Some(Utc.timestamp_nanos(100)); + options.check_preconditions(&meta).unwrap(); + + options.if_unmodified_since = Some(Utc.timestamp_nanos(101)); + options.check_preconditions(&meta).unwrap(); + + options = GetOptions::default(); + + options.if_match = Some("123".to_string()); + options.check_preconditions(&meta).unwrap(); + + options.if_match = Some("123,354".to_string()); + options.check_preconditions(&meta).unwrap(); + + options.if_match = Some("354, 123,".to_string()); + options.check_preconditions(&meta).unwrap(); + + options.if_match = Some("354".to_string()); + options.check_preconditions(&meta).unwrap_err(); + + options.if_match = Some("*".to_string()); + options.check_preconditions(&meta).unwrap(); + + // If-Match takes precedence + options.if_unmodified_since = Some(Utc.timestamp_nanos(200)); + options.check_preconditions(&meta).unwrap(); + + options = GetOptions::default(); + + options.if_none_match = Some("123".to_string()); + options.check_preconditions(&meta).unwrap_err(); + + options.if_none_match = Some("*".to_string()); + options.check_preconditions(&meta).unwrap_err(); + + options.if_none_match = Some("1232".to_string()); + options.check_preconditions(&meta).unwrap(); + + options.if_none_match = Some("23, 123".to_string()); + options.check_preconditions(&meta).unwrap_err(); + + // If-None-Match takes precedence + options.if_modified_since = Some(Utc.timestamp_nanos(10)); + options.check_preconditions(&meta).unwrap_err(); + + // Check missing ETag + meta.e_tag = None; + options = GetOptions::default(); + + options.if_none_match = Some("*".to_string()); // Fails if any file exists + options.check_preconditions(&meta).unwrap_err(); + + options = GetOptions::default(); + options.if_match = Some("*".to_string()); // Passes if file exists + options.check_preconditions(&meta).unwrap(); + } } diff --git a/object_store/src/local.rs b/object_store/src/local.rs index 3ed63a410815..3d4a02a1e9e9 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -365,23 +365,12 @@ impl ObjectStore for LocalFileSystem { } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { - if options.if_match.is_some() || options.if_none_match.is_some() { - return Err(super::Error::NotSupported { - source: "ETags not supported by LocalFileSystem".to_string().into(), - }); - } - let location = location.clone(); let path = self.config.path_to_filesystem(&location)?; maybe_spawn_blocking(move || { let (file, metadata) = open_file(&path)?; - if options.if_unmodified_since.is_some() - || options.if_modified_since.is_some() - { - options.check_modified(&location, last_modified(&metadata))?; - } - let meta = convert_metadata(metadata, location)?; + options.check_preconditions(&meta)?; Ok(GetResult { payload: GetResultPayload::File(file, path), @@ -965,7 +954,7 @@ fn convert_entry(entry: DirEntry, location: Path) -> Result { convert_metadata(metadata, location) } -fn last_modified(metadata: &std::fs::Metadata) -> DateTime { +fn last_modified(metadata: &Metadata) -> DateTime { metadata .modified() .expect("Modified file time should be supported on this platform") @@ -977,15 +966,35 @@ fn convert_metadata(metadata: Metadata, location: Path) -> Result { let size = usize::try_from(metadata.len()).context(FileSizeOverflowedUsizeSnafu { path: location.as_ref(), })?; + let inode = get_inode(&metadata); + let mtime = last_modified.timestamp_micros(); + + // Use an ETag scheme based on that used by many popular HTTP servers + // + // + let etag = format!("{inode:x}-{mtime:x}-{size:x}"); Ok(ObjectMeta { location, last_modified, size, - e_tag: None, + e_tag: Some(etag), }) } +#[cfg(unix)] +/// We include the inode when available to yield an ETag more resistant to collisions +/// and as used by popular web servers such as [Apache](https://httpd.apache.org/docs/2.2/mod/core.html#fileetag) +fn get_inode(metadata: &Metadata) -> u64 { + std::os::unix::fs::MetadataExt::ino(metadata) +} + +#[cfg(not(unix))] +/// On platforms where an inode isn't available, fallback to just relying on size and mtime +fn get_inode(metadata: &Metadata) -> u64 { + 0 +} + /// Convert walkdir results and converts not-found errors into `None`. /// Convert broken symlinks to `None`. fn convert_walkdir_result( diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index 0e229885b006..f638ed6d7a55 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -35,9 +35,6 @@ use std::sync::Arc; use std::task::Poll; use tokio::io::AsyncWrite; -type Entry = (Bytes, DateTime); -type StorageType = Arc>>; - /// A specialized `Error` for in-memory object store-related errors #[derive(Debug, Snafu)] #[allow(missing_docs)] @@ -80,7 +77,41 @@ impl From for super::Error { /// storage provider. #[derive(Debug, Default)] pub struct InMemory { - storage: StorageType, + storage: SharedStorage, +} + +#[derive(Debug, Clone)] +struct Entry { + data: Bytes, + last_modified: DateTime, + e_tag: usize, +} + +impl Entry { + fn new(data: Bytes, last_modified: DateTime, e_tag: usize) -> Self { + Self { + data, + last_modified, + e_tag, + } + } +} + +#[derive(Debug, Default, Clone)] +struct Storage { + next_etag: usize, + map: BTreeMap, +} + +type SharedStorage = Arc>; + +impl Storage { + fn insert(&mut self, location: &Path, bytes: Bytes) { + let etag = self.next_etag; + self.next_etag += 1; + let entry = Entry::new(bytes, Utc::now(), etag); + self.map.insert(location.clone(), entry); + } } impl std::fmt::Display for InMemory { @@ -92,9 +123,7 @@ impl std::fmt::Display for InMemory { #[async_trait] impl ObjectStore for InMemory { async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { - self.storage - .write() - .insert(location.clone(), (bytes, Utc::now())); + self.storage.write().insert(location, bytes); Ok(()) } @@ -128,33 +157,30 @@ impl ObjectStore for InMemory { Ok(Box::new(InMemoryAppend { location: location.clone(), data: Vec::::new(), - storage: StorageType::clone(&self.storage), + storage: SharedStorage::clone(&self.storage), })) } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { - if options.if_match.is_some() || options.if_none_match.is_some() { - return Err(super::Error::NotSupported { - source: "ETags not supported by InMemory".to_string().into(), - }); - } - let (data, last_modified) = self.entry(location).await?; - options.check_modified(location, last_modified)?; + let entry = self.entry(location).await?; + let e_tag = entry.e_tag.to_string(); + let meta = ObjectMeta { location: location.clone(), - last_modified, - size: data.len(), - e_tag: None, + last_modified: entry.last_modified, + size: entry.data.len(), + e_tag: Some(e_tag), }; + options.check_preconditions(&meta)?; let (range, data) = match options.range { Some(range) => { - let len = data.len(); + let len = entry.data.len(); ensure!(range.end <= len, OutOfRangeSnafu { range, len }); ensure!(range.start <= range.end, BadRangeSnafu { range }); - (range.clone(), data.slice(range)) + (range.clone(), entry.data.slice(range)) } - None => (0..data.len(), data), + None => (0..entry.data.len(), entry.data), }; let stream = futures::stream::once(futures::future::ready(Ok(data))); @@ -170,15 +196,18 @@ impl ObjectStore for InMemory { location: &Path, ranges: &[Range], ) -> Result> { - let data = self.entry(location).await?; + let entry = self.entry(location).await?; ranges .iter() .map(|range| { let range = range.clone(); - let len = data.0.len(); - ensure!(range.end <= data.0.len(), OutOfRangeSnafu { range, len }); + let len = entry.data.len(); + ensure!( + range.end <= entry.data.len(), + OutOfRangeSnafu { range, len } + ); ensure!(range.start <= range.end, BadRangeSnafu { range }); - Ok(data.0.slice(range)) + Ok(entry.data.slice(range)) }) .collect() } @@ -188,14 +217,14 @@ impl ObjectStore for InMemory { Ok(ObjectMeta { location: location.clone(), - last_modified: entry.1, - size: entry.0.len(), - e_tag: None, + last_modified: entry.last_modified, + size: entry.data.len(), + e_tag: Some(entry.e_tag.to_string()), }) } async fn delete(&self, location: &Path) -> Result<()> { - self.storage.write().remove(location); + self.storage.write().map.remove(location); Ok(()) } @@ -208,6 +237,7 @@ impl ObjectStore for InMemory { let storage = self.storage.read(); let values: Vec<_> = storage + .map .range((prefix)..) .take_while(|(key, _)| key.as_ref().starts_with(prefix.as_ref())) .filter(|(key, _)| { @@ -219,9 +249,9 @@ impl ObjectStore for InMemory { .map(|(key, value)| { Ok(ObjectMeta { location: key.clone(), - last_modified: value.1, - size: value.0.len(), - e_tag: None, + last_modified: value.last_modified, + size: value.data.len(), + e_tag: Some(value.e_tag.to_string()), }) }) .collect(); @@ -241,7 +271,7 @@ impl ObjectStore for InMemory { // Only objects in this base level should be returned in the // response. Otherwise, we just collect the common prefixes. let mut objects = vec![]; - for (k, v) in self.storage.read().range((prefix)..) { + for (k, v) in self.storage.read().map.range((prefix)..) { if !k.as_ref().starts_with(prefix.as_ref()) { break; } @@ -263,9 +293,9 @@ impl ObjectStore for InMemory { } else { let object = ObjectMeta { location: k.clone(), - last_modified: v.1, - size: v.0.len(), - e_tag: None, + last_modified: v.last_modified, + size: v.data.len(), + e_tag: Some(v.e_tag.to_string()), }; objects.push(object); } @@ -278,23 +308,21 @@ impl ObjectStore for InMemory { } async fn copy(&self, from: &Path, to: &Path) -> Result<()> { - let data = self.entry(from).await?; - self.storage - .write() - .insert(to.clone(), (data.0, Utc::now())); + let entry = self.entry(from).await?; + self.storage.write().insert(to, entry.data); Ok(()) } async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { - let data = self.entry(from).await?; + let entry = self.entry(from).await?; let mut storage = self.storage.write(); - if storage.contains_key(to) { + if storage.map.contains_key(to) { return Err(Error::AlreadyExists { path: to.to_string(), } .into()); } - storage.insert(to.clone(), (data.0, Utc::now())); + storage.insert(to, entry.data); Ok(()) } } @@ -319,9 +347,10 @@ impl InMemory { self.fork() } - async fn entry(&self, location: &Path) -> Result<(Bytes, DateTime)> { + async fn entry(&self, location: &Path) -> Result { let storage = self.storage.read(); let value = storage + .map .get(location) .cloned() .context(NoDataInMemorySnafu { @@ -335,7 +364,7 @@ impl InMemory { struct InMemoryUpload { location: Path, data: Vec, - storage: StorageType, + storage: Arc>, } impl AsyncWrite for InMemoryUpload { @@ -343,7 +372,7 @@ impl AsyncWrite for InMemoryUpload { mut self: Pin<&mut Self>, _cx: &mut std::task::Context<'_>, buf: &[u8], - ) -> std::task::Poll> { + ) -> Poll> { self.data.extend_from_slice(buf); Poll::Ready(Ok(buf.len())) } @@ -351,18 +380,16 @@ impl AsyncWrite for InMemoryUpload { fn poll_flush( self: Pin<&mut Self>, _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { + ) -> Poll> { Poll::Ready(Ok(())) } fn poll_shutdown( mut self: Pin<&mut Self>, _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { + ) -> Poll> { let data = Bytes::from(std::mem::take(&mut self.data)); - self.storage - .write() - .insert(self.location.clone(), (data, Utc::now())); + self.storage.write().insert(&self.location, data); Poll::Ready(Ok(())) } } @@ -370,7 +397,7 @@ impl AsyncWrite for InMemoryUpload { struct InMemoryAppend { location: Path, data: Vec, - storage: StorageType, + storage: Arc>, } impl AsyncWrite for InMemoryAppend { @@ -378,7 +405,7 @@ impl AsyncWrite for InMemoryAppend { mut self: Pin<&mut Self>, _cx: &mut std::task::Context<'_>, buf: &[u8], - ) -> std::task::Poll> { + ) -> Poll> { self.data.extend_from_slice(buf); Poll::Ready(Ok(buf.len())) } @@ -386,20 +413,18 @@ impl AsyncWrite for InMemoryAppend { fn poll_flush( mut self: Pin<&mut Self>, _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - let storage = StorageType::clone(&self.storage); + ) -> Poll> { + let storage = Arc::clone(&self.storage); let mut writer = storage.write(); - if let Some((bytes, _)) = writer.remove(&self.location) { + if let Some(entry) = writer.map.remove(&self.location) { let buf = std::mem::take(&mut self.data); - let concat = Bytes::from_iter(bytes.into_iter().chain(buf)); - writer.insert(self.location.clone(), (concat, Utc::now())); + let concat = Bytes::from_iter(entry.data.into_iter().chain(buf)); + writer.insert(&self.location, concat); } else { - writer.insert( - self.location.clone(), - (Bytes::from(std::mem::take(&mut self.data)), Utc::now()), - ); + let data = Bytes::from(std::mem::take(&mut self.data)); + writer.insert(&self.location, data); }; Poll::Ready(Ok(())) }