From c274680dcf2018bce6ddc91435c8d608a02d966a Mon Sep 17 00:00:00 2001 From: elijah Date: Wed, 1 Feb 2023 23:56:03 +0800 Subject: [PATCH 1/6] feat: enable caching when using object store --- Cargo.lock | 1 + src/datanode/src/instance.rs | 9 ++- src/object-store/Cargo.toml | 1 + src/object-store/src/cache.rs | 62 +++++++++++++++++++++ src/object-store/src/lib.rs | 1 + src/object-store/tests/object_store_test.rs | 48 ++++++++++++++++ 6 files changed, 121 insertions(+), 1 deletion(-) create mode 100644 src/object-store/src/cache.rs diff --git a/Cargo.lock b/Cargo.lock index dbb67be715e2..831f624c7186 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4399,6 +4399,7 @@ name = "object-store" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "common-telemetry", "futures", "opendal", diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index a75ceab75df6..e559b3541197 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -28,8 +28,10 @@ use meta_client::client::{MetaClient, MetaClientBuilder}; use meta_client::MetaClientOpts; use mito::config::EngineConfig as TableEngineConfig; use mito::engine::MitoEngine; -use object_store::layers::{LoggingLayer, MetricsLayer, RetryLayer, TracingLayer}; +use object_store::cache::ObjectStoreCachePolicy; +use object_store::layers::{CacheLayer, LoggingLayer, MetricsLayer, RetryLayer, TracingLayer}; use object_store::services::fs::Builder as FsBuilder; +use object_store::services::memory::Builder as MemoryBuilder; use object_store::services::oss::Builder as OSSBuilder; use object_store::services::s3::Builder as S3Builder; use object_store::{util, ObjectStore}; @@ -207,12 +209,17 @@ pub(crate) async fn new_object_store(store_config: &ObjectStoreConfig) -> Result ObjectStoreConfig::Oss { .. } => new_oss_object_store(store_config).await, }; + let mem_accessor = MemoryBuilder::default().build().unwrap(); object_store.map(|object_store| { object_store .layer(RetryLayer::new(ExponentialBackoff::default().with_jitter())) .layer(MetricsLayer) .layer(LoggingLayer::default()) .layer(TracingLayer) + .layer( + CacheLayer::new(ObjectStore::new(mem_accessor)) + .with_policy(ObjectStoreCachePolicy::default()), + ) }) } diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index 37d78ea391b2..e617dbe77879 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true license.workspace = true [dependencies] +async-trait = "0.1" futures = { version = "0.3" } opendal = { version = "0.25.1", features = [ "layers-tracing", diff --git a/src/object-store/src/cache.rs b/src/object-store/src/cache.rs new file mode 100644 index 000000000000..5a4577ab650f --- /dev/null +++ b/src/object-store/src/cache.rs @@ -0,0 +1,62 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use futures::future::BoxFuture; +use opendal::layers::CachePolicy; +use opendal::raw::output::Reader; +use opendal::raw::{Accessor, RpRead}; +use opendal::{ErrorKind, OpRead, OpWrite, Result}; + +#[derive(Debug, Default)] +pub struct ObjectStoreCachePolicy {} + +impl ObjectStoreCachePolicy { + fn cache_path(&self, path: &str, args: &OpRead) -> String { + format!("{}.cache-{}", path, args.range().to_header()) + } +} + +#[async_trait] +impl CachePolicy for ObjectStoreCachePolicy { + fn on_read( + &self, + inner: Arc, + cache: Arc, + path: &str, + args: OpRead, + ) -> BoxFuture<'static, Result<(RpRead, Reader)>> { + let path = path.to_string(); + let cache_path = self.cache_path(&path, &args); + Box::pin(async move { + match cache.read(&cache_path, OpRead::default()).await { + Ok(v) => Ok(v), + Err(err) if err.kind() == ErrorKind::ObjectNotFound => { + let (rp, reader) = inner.read(&path, args.clone()).await?; + let size = rp.clone().into_metadata().content_length(); + let _ = cache + .write(&cache_path, OpWrite::new(size), Box::new(reader)) + .await?; + match cache.read(&cache_path, OpRead::default()).await { + Ok(v) => Ok(v), + Err(_) => return inner.read(&path, args).await, + } + } + Err(_) => return inner.read(&path, args).await, + } + }) + } +} diff --git a/src/object-store/src/lib.rs b/src/object-store/src/lib.rs index 1079a9f4d1db..e5ed96f95fb3 100644 --- a/src/object-store/src/lib.rs +++ b/src/object-store/src/lib.rs @@ -17,5 +17,6 @@ pub use opendal::{ Operator as ObjectStore, Result, }; pub mod backend; +pub mod cache; pub mod test_util; pub mod util; diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index c52db9e10707..e7b7094ba073 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -16,9 +16,12 @@ use std::env; use anyhow::Result; use common_telemetry::logging; +use futures::TryStreamExt; use object_store::backend::{fs, s3}; +use object_store::cache::ObjectStoreCachePolicy; use object_store::test_util::TempFolder; use object_store::{util, Object, ObjectLister, ObjectMode, ObjectStore}; +use opendal::layers::CacheLayer; use opendal::services::oss; use tempdir::TempDir; @@ -160,3 +163,48 @@ async fn test_oss_backend() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_object_store_cache_policy() -> Result<()> { + let root_dir = TempDir::new("test_fs_backend")?; + let store = ObjectStore::new( + fs::Builder::default() + .root(&root_dir.path().to_string_lossy()) + .build()?, + ); + + let cache_dir = TempDir::new("test_fs_cache")?; + let cache_op = ObjectStore::new( + fs::Builder::default() + .root(&cache_dir.path().to_string_lossy()) + .build()?, + ); + let test_cache_op = ObjectStore::from(cache_op.inner()); + let store = + store.layer(CacheLayer::new(cache_op).with_policy(ObjectStoreCachePolicy::default())); + + // Create object handler. + let object = store.object("test_file"); + + // Write data info object; + assert!(object.write("Hello, World!").await.is_ok()); + + // Read data from object; + let bs = object.read().await?; + assert_eq!("Hello, World!", String::from_utf8(bs)?); + + // Read data from object cache; + let mut lister = test_cache_op.batch().walk_top_down("/")?; + while let Some(obj) = lister.try_next().await? { + match obj.mode().await? { + ObjectMode::FILE => { + assert!(obj.name().starts_with("test_file")); + let bs = obj.read().await?; + assert_eq!("Hello, World!", String::from_utf8(bs)?); + } + _ => continue, + } + } + + Ok(()) +} From 9c8bd70dafa342c744d2b1035df1b9ca341254fc Mon Sep 17 00:00:00 2001 From: elijah Date: Thu, 2 Feb 2023 15:27:06 +0800 Subject: [PATCH 2/6] feat: support file cache for object store --- src/datanode/src/datanode.rs | 2 ++ src/datanode/src/instance.rs | 38 +++++++++++++++++++++++------- tests-integration/src/test_util.rs | 2 ++ 3 files changed, 34 insertions(+), 8 deletions(-) diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 22e2231fd40e..28f1dfb74510 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -48,6 +48,7 @@ pub struct S3Config { pub secret_access_key: String, pub endpoint: Option, pub region: Option, + pub cache_path: Option, } #[derive(Debug, Clone, Serialize, Default, Deserialize)] @@ -58,6 +59,7 @@ pub struct OssConfig { pub access_key_id: String, pub access_key_secret: String, pub endpoint: String, + pub cache_path: Option, } impl Default for ObjectStoreConfig { diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index e559b3541197..dcbee3b1a821 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -31,7 +31,6 @@ use mito::engine::MitoEngine; use object_store::cache::ObjectStoreCachePolicy; use object_store::layers::{CacheLayer, LoggingLayer, MetricsLayer, RetryLayer, TracingLayer}; use object_store::services::fs::Builder as FsBuilder; -use object_store::services::memory::Builder as MemoryBuilder; use object_store::services::oss::Builder as OSSBuilder; use object_store::services::s3::Builder as S3Builder; use object_store::{util, ObjectStore}; @@ -209,17 +208,12 @@ pub(crate) async fn new_object_store(store_config: &ObjectStoreConfig) -> Result ObjectStoreConfig::Oss { .. } => new_oss_object_store(store_config).await, }; - let mem_accessor = MemoryBuilder::default().build().unwrap(); object_store.map(|object_store| { object_store .layer(RetryLayer::new(ExponentialBackoff::default().with_jitter())) .layer(MetricsLayer) .layer(LoggingLayer::default()) .layer(TracingLayer) - .layer( - CacheLayer::new(ObjectStore::new(mem_accessor)) - .with_policy(ObjectStoreCachePolicy::default()), - ) }) } @@ -247,7 +241,21 @@ pub(crate) async fn new_oss_object_store(store_config: &ObjectStoreConfig) -> Re config: store_config.clone(), })?; - Ok(ObjectStore::new(accessor)) + let mut object_store = ObjectStore::new(accessor); + if let Some(path) = oss_config.cache_path.as_ref() { + let accessor = + FsBuilder::default() + .root(path) + .build() + .with_context(|_| error::InitBackendSnafu { + config: store_config.clone(), + })?; + object_store = object_store.layer( + CacheLayer::new(ObjectStore::new(accessor)) + .with_policy(ObjectStoreCachePolicy::default()), + ); + } + Ok(object_store) } pub(crate) async fn new_s3_object_store(store_config: &ObjectStoreConfig) -> Result { @@ -280,7 +288,21 @@ pub(crate) async fn new_s3_object_store(store_config: &ObjectStoreConfig) -> Res config: store_config.clone(), })?; - Ok(ObjectStore::new(accessor)) + let mut object_store = ObjectStore::new(accessor); + if let Some(path) = s3_config.cache_path.as_ref() { + let accessor = + FsBuilder::default() + .root(path) + .build() + .with_context(|_| error::InitBackendSnafu { + config: store_config.clone(), + })?; + object_store = object_store.layer( + CacheLayer::new(ObjectStore::new(accessor)) + .with_policy(ObjectStoreCachePolicy::default()), + ); + } + Ok(object_store) } pub(crate) async fn new_fs_object_store(store_config: &ObjectStoreConfig) -> Result { diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 978495cec796..42121243ec5b 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -101,6 +101,7 @@ fn get_test_store_config( access_key_secret: env::var("GT_OSS_ACCESS_KEY").unwrap(), bucket: env::var("GT_OSS_BUCKET").unwrap(), endpoint: env::var("GT_OSS_ENDPOINT").unwrap(), + cache_path: None, }; let accessor = oss::Builder::default() @@ -129,6 +130,7 @@ fn get_test_store_config( bucket: env::var("GT_S3_BUCKET").unwrap(), endpoint: None, region: None, + cache_path: None, }; let accessor = s3::Builder::default() From dec0c0701c4032d1656906afbbd8911a7380cfd9 Mon Sep 17 00:00:00 2001 From: elijah Date: Sun, 5 Feb 2023 17:01:06 +0800 Subject: [PATCH 3/6] feat: maintaining the cached files with lru --- Cargo.lock | 10 ++ src/datanode/src/datanode.rs | 2 + src/datanode/src/instance.rs | 18 +-- src/object-store/Cargo.toml | 1 + src/object-store/src/cache.rs | 62 ---------- src/object-store/src/cache_policy.rs | 124 ++++++++++++++++++++ src/object-store/src/lib.rs | 2 +- src/object-store/tests/object_store_test.rs | 108 +++++++++++++---- tests-integration/src/test_util.rs | 2 + 9 files changed, 233 insertions(+), 96 deletions(-) delete mode 100644 src/object-store/src/cache.rs create mode 100644 src/object-store/src/cache_policy.rs diff --git a/Cargo.lock b/Cargo.lock index 831f624c7186..87f8a1c2ec29 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3681,6 +3681,15 @@ dependencies = [ "hashbrown 0.12.3", ] +[[package]] +name = "lru" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71e7d46de488603ffdd5f30afbc64fbba2378214a2c3a2fb83abf3d33126df17" +dependencies = [ + "hashbrown 0.13.2", +] + [[package]] name = "lz4" version = "1.24.0" @@ -4402,6 +4411,7 @@ dependencies = [ "async-trait", "common-telemetry", "futures", + "lru 0.9.0", "opendal", "tempdir", "tokio", diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 28f1dfb74510..549d18271347 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -49,6 +49,7 @@ pub struct S3Config { pub endpoint: Option, pub region: Option, pub cache_path: Option, + pub cache_capacity: Option, } #[derive(Debug, Clone, Serialize, Default, Deserialize)] @@ -60,6 +61,7 @@ pub struct OssConfig { pub access_key_secret: String, pub endpoint: String, pub cache_path: Option, + pub cache_capacity: Option, } impl Default for ObjectStoreConfig { diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index dcbee3b1a821..a357ec85d827 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -28,7 +28,7 @@ use meta_client::client::{MetaClient, MetaClientBuilder}; use meta_client::MetaClientOpts; use mito::config::EngineConfig as TableEngineConfig; use mito::engine::MitoEngine; -use object_store::cache::ObjectStoreCachePolicy; +use object_store::cache_policy::LruCachePolicy; use object_store::layers::{CacheLayer, LoggingLayer, MetricsLayer, RetryLayer, TracingLayer}; use object_store::services::fs::Builder as FsBuilder; use object_store::services::oss::Builder as OSSBuilder; @@ -250,10 +250,10 @@ pub(crate) async fn new_oss_object_store(store_config: &ObjectStoreConfig) -> Re .with_context(|_| error::InitBackendSnafu { config: store_config.clone(), })?; - object_store = object_store.layer( - CacheLayer::new(ObjectStore::new(accessor)) - .with_policy(ObjectStoreCachePolicy::default()), - ); + let cache_capacity = oss_config.cache_capacity.unwrap(); + let policy = LruCachePolicy::new(cache_capacity.0 as usize); + object_store = + object_store.layer(CacheLayer::new(ObjectStore::new(accessor)).with_policy(policy)); } Ok(object_store) } @@ -297,10 +297,10 @@ pub(crate) async fn new_s3_object_store(store_config: &ObjectStoreConfig) -> Res .with_context(|_| error::InitBackendSnafu { config: store_config.clone(), })?; - object_store = object_store.layer( - CacheLayer::new(ObjectStore::new(accessor)) - .with_policy(ObjectStoreCachePolicy::default()), - ); + let cache_capacity = s3_config.cache_capacity.unwrap(); + let policy = LruCachePolicy::new(cache_capacity.0 as usize); + object_store = + object_store.layer(CacheLayer::new(ObjectStore::new(accessor)).with_policy(policy)); } Ok(object_store) } diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index e617dbe77879..f18e1e9f7f82 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true license.workspace = true [dependencies] +lru = "0.9" async-trait = "0.1" futures = { version = "0.3" } opendal = { version = "0.25.1", features = [ diff --git a/src/object-store/src/cache.rs b/src/object-store/src/cache.rs deleted file mode 100644 index 5a4577ab650f..000000000000 --- a/src/object-store/src/cache.rs +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use async_trait::async_trait; -use futures::future::BoxFuture; -use opendal::layers::CachePolicy; -use opendal::raw::output::Reader; -use opendal::raw::{Accessor, RpRead}; -use opendal::{ErrorKind, OpRead, OpWrite, Result}; - -#[derive(Debug, Default)] -pub struct ObjectStoreCachePolicy {} - -impl ObjectStoreCachePolicy { - fn cache_path(&self, path: &str, args: &OpRead) -> String { - format!("{}.cache-{}", path, args.range().to_header()) - } -} - -#[async_trait] -impl CachePolicy for ObjectStoreCachePolicy { - fn on_read( - &self, - inner: Arc, - cache: Arc, - path: &str, - args: OpRead, - ) -> BoxFuture<'static, Result<(RpRead, Reader)>> { - let path = path.to_string(); - let cache_path = self.cache_path(&path, &args); - Box::pin(async move { - match cache.read(&cache_path, OpRead::default()).await { - Ok(v) => Ok(v), - Err(err) if err.kind() == ErrorKind::ObjectNotFound => { - let (rp, reader) = inner.read(&path, args.clone()).await?; - let size = rp.clone().into_metadata().content_length(); - let _ = cache - .write(&cache_path, OpWrite::new(size), Box::new(reader)) - .await?; - match cache.read(&cache_path, OpRead::default()).await { - Ok(v) => Ok(v), - Err(_) => return inner.read(&path, args).await, - } - } - Err(_) => return inner.read(&path, args).await, - } - }) - } -} diff --git a/src/object-store/src/cache_policy.rs b/src/object-store/src/cache_policy.rs new file mode 100644 index 000000000000..4cbef19293bb --- /dev/null +++ b/src/object-store/src/cache_policy.rs @@ -0,0 +1,124 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::num::NonZeroUsize; +use std::ops::DerefMut; +use std::sync::{Arc, RwLock}; + +use async_trait::async_trait; +use futures::future::BoxFuture; +use lru::LruCache; +use opendal::layers::CachePolicy; +use opendal::raw::output::Reader; +use opendal::raw::{Accessor, RpDelete, RpRead}; +use opendal::{ErrorKind, OpDelete, OpRead, OpWrite, Result}; + +struct LruValue; + +#[derive(Debug)] +pub struct LruCachePolicy { + lru_cache: Arc>>, +} + +impl LruCachePolicy { + pub fn new(capacity: usize) -> Self { + Self { + lru_cache: Arc::new(RwLock::new(LruCache::new( + NonZeroUsize::new(capacity).unwrap(), + ))), + } + } + + fn cache_path(&self, path: &str, args: &OpRead) -> String { + format!("{}.cache-{}", path, args.range().to_header()) + } +} + +#[async_trait] +impl CachePolicy for LruCachePolicy { + fn on_read( + &self, + inner: Arc, + cache: Arc, + path: &str, + args: OpRead, + ) -> BoxFuture<'static, Result<(RpRead, Reader)>> { + let path = path.to_string(); + let cache_path = self.cache_path(&path, &args); + let lru_cache = self.lru_cache.clone(); + Box::pin(async move { + match cache.read(&cache_path, OpRead::default()).await { + Ok(v) => { + // update lru when cache hit + let mut lru_cache = lru_cache.write().unwrap(); + lru_cache.get_or_insert(cache_path.clone(), || LruValue {}); + Ok(v) + } + Err(err) if err.kind() == ErrorKind::ObjectNotFound => { + let (rp, reader) = inner.read(&path, args.clone()).await?; + let size = rp.clone().into_metadata().content_length(); + let _ = cache + .write(&cache_path, OpWrite::new(size), Box::new(reader)) + .await?; + match cache.read(&cache_path, OpRead::default()).await { + Ok(v) => { + let r = { + // push new cache file name to lru + let mut lru_cache = lru_cache.write().unwrap(); + lru_cache.push(cache_path.clone(), LruValue {}) + }; + // delete the evicted cache file + if let Some((k, _v)) = r { + let _ = cache.delete(&k, OpDelete::new()).await; + } + Ok(v) + } + Err(_) => return inner.read(&path, args).await, + } + } + Err(_) => return inner.read(&path, args).await, + } + }) + } + + fn on_delete( + &self, + inner: Arc, + cache: Arc, + path: &str, + args: OpDelete, + ) -> BoxFuture<'static, Result> { + let path = path.to_string(); + let lru_cache = self.lru_cache.clone(); + Box::pin(async move { + let cache_files: Vec = { + let mut guard = lru_cache.write().unwrap(); + let lru = guard.deref_mut(); + let cache_files = lru + .iter() + .filter(|(k, _v)| k.starts_with(format!("{path}.cache-").as_str())) + .map(|(k, _v)| k.clone()) + .collect::>(); + for k in &cache_files { + lru.pop(k); + } + cache_files + }; + for file in cache_files { + let _ = cache.delete(&file, OpDelete::new()).await; + } + return inner.delete(&path, args).await; + }) + } +} diff --git a/src/object-store/src/lib.rs b/src/object-store/src/lib.rs index e5ed96f95fb3..cd2d3b1d798b 100644 --- a/src/object-store/src/lib.rs +++ b/src/object-store/src/lib.rs @@ -17,6 +17,6 @@ pub use opendal::{ Operator as ObjectStore, Result, }; pub mod backend; -pub mod cache; +pub mod cache_policy; pub mod test_util; pub mod util; diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index e7b7094ba073..60a779122612 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -16,13 +16,13 @@ use std::env; use anyhow::Result; use common_telemetry::logging; -use futures::TryStreamExt; use object_store::backend::{fs, s3}; -use object_store::cache::ObjectStoreCachePolicy; +use object_store::cache_policy::LruCachePolicy; use object_store::test_util::TempFolder; use object_store::{util, Object, ObjectLister, ObjectMode, ObjectStore}; use opendal::layers::CacheLayer; use opendal::services::oss; +use opendal::Operator; use tempdir::TempDir; async fn test_object_crud(store: &ObjectStore) -> Result<()> { @@ -164,47 +164,107 @@ async fn test_oss_backend() -> Result<()> { Ok(()) } +async fn assert_cache_files( + store: &Operator, + file_names: &[&str], + file_contents: &[&str], +) -> Result<()> { + let o = store.object("/"); + let obs = o.list().await?; + let objects = util::collect(obs).await?; + + // compare the cache file with the expected cache file; ignore orders + for o in objects { + let position = file_names.iter().position(|&x| x == o.name()); + assert!(position.is_some(), "file not found: {}", o.name()); + + let position = position.unwrap(); + let bs = o.read().await?; + assert_eq!( + file_contents[position], + String::from_utf8(bs.clone())?, + "file content not match: {}", + o.name() + ); + } + + Ok(()) +} + #[tokio::test] async fn test_object_store_cache_policy() -> Result<()> { + // create file storage let root_dir = TempDir::new("test_fs_backend")?; let store = ObjectStore::new( fs::Builder::default() .root(&root_dir.path().to_string_lossy()) + .atomic_write_dir(&root_dir.path().to_string_lossy()) .build()?, ); + // create file cache layer let cache_dir = TempDir::new("test_fs_cache")?; let cache_op = ObjectStore::new( fs::Builder::default() .root(&cache_dir.path().to_string_lossy()) + .atomic_write_dir(&cache_dir.path().to_string_lossy()) .build()?, ); - let test_cache_op = ObjectStore::from(cache_op.inner()); - let store = - store.layer(CacheLayer::new(cache_op).with_policy(ObjectStoreCachePolicy::default())); + // create operator for cache dir to verify cache file + let cache_store = ObjectStore::from(cache_op.inner()); + let policy = LruCachePolicy::new(3); + let store = store.layer(CacheLayer::new(cache_op).with_policy(policy)); - // Create object handler. - let object = store.object("test_file"); + // create several object handler. + let o1 = store.object("test_file1"); + let o2 = store.object("test_file2"); - // Write data info object; - assert!(object.write("Hello, World!").await.is_ok()); + // write data into object; + assert!(o1.write("Hello, object1!").await.is_ok()); + assert!(o2.write("Hello, object2!").await.is_ok()); - // Read data from object; - let bs = object.read().await?; - assert_eq!("Hello, World!", String::from_utf8(bs)?); + // crate cache by read object + o1.range_read(7..).await?; + o1.read().await?; + o2.range_read(7..).await?; + o2.read().await?; + + assert_cache_files( + &cache_store, + &[ + "test_file1.cache-bytes=0-", + "test_file2.cache-bytes=7-", + "test_file2.cache-bytes=0-", + ], + &["Hello, object1!", "object2!", "Hello, object2!"], + ) + .await?; - // Read data from object cache; - let mut lister = test_cache_op.batch().walk_top_down("/")?; - while let Some(obj) = lister.try_next().await? { - match obj.mode().await? { - ObjectMode::FILE => { - assert!(obj.name().starts_with("test_file")); - let bs = obj.read().await?; - assert_eq!("Hello, World!", String::from_utf8(bs)?); - } - _ => continue, - } - } + assert!(o2.delete().await.is_ok()); + + assert_cache_files( + &cache_store, + &["test_file1.cache-bytes=0-"], + &["Hello, object1!"], + ) + .await?; + + let o3 = store.object("test_file3"); + assert!(o3.write("Hello, object3!").await.is_ok()); + + o3.read().await?; + o3.range_read(0..5).await?; + + assert_cache_files( + &cache_store, + &[ + "test_file1.cache-bytes=0-", + "test_file3.cache-bytes=0-", + "test_file3.cache-bytes=0-4", + ], + &["Hello, object1!", "Hello, object3!", "Hello"], + ) + .await?; Ok(()) } diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 42121243ec5b..a2de9cf268a9 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -102,6 +102,7 @@ fn get_test_store_config( bucket: env::var("GT_OSS_BUCKET").unwrap(), endpoint: env::var("GT_OSS_ENDPOINT").unwrap(), cache_path: None, + cache_capacity: None, }; let accessor = oss::Builder::default() @@ -131,6 +132,7 @@ fn get_test_store_config( endpoint: None, region: None, cache_path: None, + cache_capacity: None, }; let accessor = s3::Builder::default() From 47bcf6c8d99c9d52a4d4450b866784af093559dd Mon Sep 17 00:00:00 2001 From: elijah Date: Tue, 7 Feb 2023 00:15:23 +0800 Subject: [PATCH 4/6] fix: improve the code --- src/datanode/src/datanode.rs | 2 + src/datanode/src/instance.rs | 68 ++++++++++++++++------------ src/object-store/src/cache_policy.rs | 13 +++--- 3 files changed, 49 insertions(+), 34 deletions(-) diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 549d18271347..bf2a43a4ff29 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -25,6 +25,8 @@ use crate::error::Result; use crate::instance::{Instance, InstanceRef}; use crate::server::Services; +pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize(1024); + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] pub enum ObjectStoreConfig { diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index a357ec85d827..3aa6ac983ce6 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -19,6 +19,7 @@ use std::{fs, path}; use backon::ExponentialBackoff; use catalog::remote::MetaKvBackend; use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest}; +use common_base::readable_size::ReadableSize; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_telemetry::logging::info; @@ -43,7 +44,9 @@ use table::table::numbers::NumbersTable; use table::table::TableIdProviderRef; use table::Table; -use crate::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig}; +use crate::datanode::{ + DatanodeOptions, ObjectStoreConfig, WalConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE, +}; use crate::error::{ self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu, NewCatalogSnafu, OpenLogStoreSnafu, Result, @@ -241,21 +244,44 @@ pub(crate) async fn new_oss_object_store(store_config: &ObjectStoreConfig) -> Re config: store_config.clone(), })?; - let mut object_store = ObjectStore::new(accessor); - if let Some(path) = oss_config.cache_path.as_ref() { - let accessor = - FsBuilder::default() - .root(path) - .build() - .with_context(|_| error::InitBackendSnafu { + create_object_store_with_cache(ObjectStore::new(accessor), store_config) +} + +fn create_object_store_with_cache( + object_store: ObjectStore, + store_config: &ObjectStoreConfig, +) -> Result { + let (cache_path, cache_capacity) = match store_config { + ObjectStoreConfig::S3(s3_config) => { + let path = s3_config.cache_path.as_ref(); + let capacity = s3_config + .cache_capacity + .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE); + (path, capacity) + } + ObjectStoreConfig::Oss(oss_config) => { + let path = oss_config.cache_path.as_ref(); + let capacity = oss_config + .cache_capacity + .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE); + (path, capacity) + } + _ => (None, ReadableSize(0)), + }; + + if let Some(path) = cache_path { + let cache_store = + ObjectStore::new(FsBuilder::default().root(path).build().with_context(|_| { + error::InitBackendSnafu { config: store_config.clone(), - })?; - let cache_capacity = oss_config.cache_capacity.unwrap(); + } + })?); let policy = LruCachePolicy::new(cache_capacity.0 as usize); - object_store = - object_store.layer(CacheLayer::new(ObjectStore::new(accessor)).with_policy(policy)); + let cache_layer = CacheLayer::new(cache_store).with_policy(policy); + Ok(object_store.layer(cache_layer)) + } else { + Ok(object_store) } - Ok(object_store) } pub(crate) async fn new_s3_object_store(store_config: &ObjectStoreConfig) -> Result { @@ -288,21 +314,7 @@ pub(crate) async fn new_s3_object_store(store_config: &ObjectStoreConfig) -> Res config: store_config.clone(), })?; - let mut object_store = ObjectStore::new(accessor); - if let Some(path) = s3_config.cache_path.as_ref() { - let accessor = - FsBuilder::default() - .root(path) - .build() - .with_context(|_| error::InitBackendSnafu { - config: store_config.clone(), - })?; - let cache_capacity = s3_config.cache_capacity.unwrap(); - let policy = LruCachePolicy::new(cache_capacity.0 as usize); - object_store = - object_store.layer(CacheLayer::new(ObjectStore::new(accessor)).with_policy(policy)); - } - Ok(object_store) + create_object_store_with_cache(ObjectStore::new(accessor), store_config) } pub(crate) async fn new_fs_object_store(store_config: &ObjectStoreConfig) -> Result { diff --git a/src/object-store/src/cache_policy.rs b/src/object-store/src/cache_policy.rs index 4cbef19293bb..ce986bad85e2 100644 --- a/src/object-store/src/cache_policy.rs +++ b/src/object-store/src/cache_policy.rs @@ -14,7 +14,7 @@ use std::num::NonZeroUsize; use std::ops::DerefMut; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use async_trait::async_trait; use futures::future::BoxFuture; @@ -23,18 +23,19 @@ use opendal::layers::CachePolicy; use opendal::raw::output::Reader; use opendal::raw::{Accessor, RpDelete, RpRead}; use opendal::{ErrorKind, OpDelete, OpRead, OpWrite, Result}; +use tokio::sync::Mutex; struct LruValue; #[derive(Debug)] pub struct LruCachePolicy { - lru_cache: Arc>>, + lru_cache: Arc>>, } impl LruCachePolicy { pub fn new(capacity: usize) -> Self { Self { - lru_cache: Arc::new(RwLock::new(LruCache::new( + lru_cache: Arc::new(Mutex::new(LruCache::new( NonZeroUsize::new(capacity).unwrap(), ))), } @@ -61,7 +62,7 @@ impl CachePolicy for LruCachePolicy { match cache.read(&cache_path, OpRead::default()).await { Ok(v) => { // update lru when cache hit - let mut lru_cache = lru_cache.write().unwrap(); + let mut lru_cache = lru_cache.lock().await; lru_cache.get_or_insert(cache_path.clone(), || LruValue {}); Ok(v) } @@ -75,7 +76,7 @@ impl CachePolicy for LruCachePolicy { Ok(v) => { let r = { // push new cache file name to lru - let mut lru_cache = lru_cache.write().unwrap(); + let mut lru_cache = lru_cache.lock().await; lru_cache.push(cache_path.clone(), LruValue {}) }; // delete the evicted cache file @@ -103,7 +104,7 @@ impl CachePolicy for LruCachePolicy { let lru_cache = self.lru_cache.clone(); Box::pin(async move { let cache_files: Vec = { - let mut guard = lru_cache.write().unwrap(); + let mut guard = lru_cache.lock().await; let lru = guard.deref_mut(); let cache_files = lru .iter() From 20a0ddcd102e185af70077ae9183e55228b441ba Mon Sep 17 00:00:00 2001 From: elijah Date: Tue, 7 Feb 2023 08:52:37 +0800 Subject: [PATCH 5/6] empty commit From b35d8d3e108905a908ad86d51a365f1d40bc8549 Mon Sep 17 00:00:00 2001 From: elijah Date: Tue, 7 Feb 2023 15:13:07 +0800 Subject: [PATCH 6/6] improve the code --- src/object-store/src/cache_policy.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/object-store/src/cache_policy.rs b/src/object-store/src/cache_policy.rs index ce986bad85e2..89919bb6a582 100644 --- a/src/object-store/src/cache_policy.rs +++ b/src/object-store/src/cache_policy.rs @@ -25,11 +25,9 @@ use opendal::raw::{Accessor, RpDelete, RpRead}; use opendal::{ErrorKind, OpDelete, OpRead, OpWrite, Result}; use tokio::sync::Mutex; -struct LruValue; - #[derive(Debug)] pub struct LruCachePolicy { - lru_cache: Arc>>, + lru_cache: Arc>>, } impl LruCachePolicy { @@ -63,7 +61,7 @@ impl CachePolicy for LruCachePolicy { Ok(v) => { // update lru when cache hit let mut lru_cache = lru_cache.lock().await; - lru_cache.get_or_insert(cache_path.clone(), || LruValue {}); + lru_cache.get_or_insert(cache_path.clone(), || ()); Ok(v) } Err(err) if err.kind() == ErrorKind::ObjectNotFound => { @@ -77,7 +75,7 @@ impl CachePolicy for LruCachePolicy { let r = { // push new cache file name to lru let mut lru_cache = lru_cache.lock().await; - lru_cache.push(cache_path.clone(), LruValue {}) + lru_cache.push(cache_path.clone(), ()) }; // delete the evicted cache file if let Some((k, _v)) = r {