diff --git a/Cargo.lock b/Cargo.lock index dbb67be715e2..87f8a1c2ec29 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3681,6 +3681,15 @@ dependencies = [ "hashbrown 0.12.3", ] +[[package]] +name = "lru" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71e7d46de488603ffdd5f30afbc64fbba2378214a2c3a2fb83abf3d33126df17" +dependencies = [ + "hashbrown 0.13.2", +] + [[package]] name = "lz4" version = "1.24.0" @@ -4399,8 +4408,10 @@ name = "object-store" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "common-telemetry", "futures", + "lru 0.9.0", "opendal", "tempdir", "tokio", diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 22e2231fd40e..bf2a43a4ff29 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -25,6 +25,8 @@ use crate::error::Result; use crate::instance::{Instance, InstanceRef}; use crate::server::Services; +pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize(1024); + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] pub enum ObjectStoreConfig { @@ -48,6 +50,8 @@ pub struct S3Config { pub secret_access_key: String, pub endpoint: Option, pub region: Option, + pub cache_path: Option, + pub cache_capacity: Option, } #[derive(Debug, Clone, Serialize, Default, Deserialize)] @@ -58,6 +62,8 @@ pub struct OssConfig { pub access_key_id: String, pub access_key_secret: String, pub endpoint: String, + pub cache_path: Option, + pub cache_capacity: Option, } impl Default for ObjectStoreConfig { diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index a75ceab75df6..3aa6ac983ce6 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -19,6 +19,7 @@ use std::{fs, path}; use backon::ExponentialBackoff; use catalog::remote::MetaKvBackend; use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest}; +use common_base::readable_size::ReadableSize; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_telemetry::logging::info; @@ -28,7 +29,8 @@ use meta_client::client::{MetaClient, MetaClientBuilder}; use meta_client::MetaClientOpts; use mito::config::EngineConfig as TableEngineConfig; use mito::engine::MitoEngine; -use object_store::layers::{LoggingLayer, MetricsLayer, RetryLayer, TracingLayer}; +use object_store::cache_policy::LruCachePolicy; +use object_store::layers::{CacheLayer, LoggingLayer, MetricsLayer, RetryLayer, TracingLayer}; use object_store::services::fs::Builder as FsBuilder; use object_store::services::oss::Builder as OSSBuilder; use object_store::services::s3::Builder as S3Builder; @@ -42,7 +44,9 @@ use table::table::numbers::NumbersTable; use table::table::TableIdProviderRef; use table::Table; -use crate::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig}; +use crate::datanode::{ + DatanodeOptions, ObjectStoreConfig, WalConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE, +}; use crate::error::{ self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu, NewCatalogSnafu, OpenLogStoreSnafu, Result, @@ -240,7 +244,44 @@ pub(crate) async fn new_oss_object_store(store_config: &ObjectStoreConfig) -> Re config: store_config.clone(), })?; - Ok(ObjectStore::new(accessor)) + create_object_store_with_cache(ObjectStore::new(accessor), store_config) +} + +fn create_object_store_with_cache( + object_store: ObjectStore, + store_config: &ObjectStoreConfig, +) -> Result { + let (cache_path, cache_capacity) = match store_config { + ObjectStoreConfig::S3(s3_config) => { + let path = s3_config.cache_path.as_ref(); + let capacity = s3_config + .cache_capacity + .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE); + (path, capacity) + } + ObjectStoreConfig::Oss(oss_config) => { + let path = oss_config.cache_path.as_ref(); + let capacity = oss_config + .cache_capacity + .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE); + (path, capacity) + } + _ => (None, ReadableSize(0)), + }; + + if let Some(path) = cache_path { + let cache_store = + ObjectStore::new(FsBuilder::default().root(path).build().with_context(|_| { + error::InitBackendSnafu { + config: store_config.clone(), + } + })?); + let policy = LruCachePolicy::new(cache_capacity.0 as usize); + let cache_layer = CacheLayer::new(cache_store).with_policy(policy); + Ok(object_store.layer(cache_layer)) + } else { + Ok(object_store) + } } pub(crate) async fn new_s3_object_store(store_config: &ObjectStoreConfig) -> Result { @@ -273,7 +314,7 @@ pub(crate) async fn new_s3_object_store(store_config: &ObjectStoreConfig) -> Res config: store_config.clone(), })?; - Ok(ObjectStore::new(accessor)) + create_object_store_with_cache(ObjectStore::new(accessor), store_config) } pub(crate) async fn new_fs_object_store(store_config: &ObjectStoreConfig) -> Result { diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index 37d78ea391b2..f18e1e9f7f82 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -5,6 +5,8 @@ edition.workspace = true license.workspace = true [dependencies] +lru = "0.9" +async-trait = "0.1" futures = { version = "0.3" } opendal = { version = "0.25.1", features = [ "layers-tracing", diff --git a/src/object-store/src/cache_policy.rs b/src/object-store/src/cache_policy.rs new file mode 100644 index 000000000000..89919bb6a582 --- /dev/null +++ b/src/object-store/src/cache_policy.rs @@ -0,0 +1,123 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::num::NonZeroUsize; +use std::ops::DerefMut; +use std::sync::Arc; + +use async_trait::async_trait; +use futures::future::BoxFuture; +use lru::LruCache; +use opendal::layers::CachePolicy; +use opendal::raw::output::Reader; +use opendal::raw::{Accessor, RpDelete, RpRead}; +use opendal::{ErrorKind, OpDelete, OpRead, OpWrite, Result}; +use tokio::sync::Mutex; + +#[derive(Debug)] +pub struct LruCachePolicy { + lru_cache: Arc>>, +} + +impl LruCachePolicy { + pub fn new(capacity: usize) -> Self { + Self { + lru_cache: Arc::new(Mutex::new(LruCache::new( + NonZeroUsize::new(capacity).unwrap(), + ))), + } + } + + fn cache_path(&self, path: &str, args: &OpRead) -> String { + format!("{}.cache-{}", path, args.range().to_header()) + } +} + +#[async_trait] +impl CachePolicy for LruCachePolicy { + fn on_read( + &self, + inner: Arc, + cache: Arc, + path: &str, + args: OpRead, + ) -> BoxFuture<'static, Result<(RpRead, Reader)>> { + let path = path.to_string(); + let cache_path = self.cache_path(&path, &args); + let lru_cache = self.lru_cache.clone(); + Box::pin(async move { + match cache.read(&cache_path, OpRead::default()).await { + Ok(v) => { + // update lru when cache hit + let mut lru_cache = lru_cache.lock().await; + lru_cache.get_or_insert(cache_path.clone(), || ()); + Ok(v) + } + Err(err) if err.kind() == ErrorKind::ObjectNotFound => { + let (rp, reader) = inner.read(&path, args.clone()).await?; + let size = rp.clone().into_metadata().content_length(); + let _ = cache + .write(&cache_path, OpWrite::new(size), Box::new(reader)) + .await?; + match cache.read(&cache_path, OpRead::default()).await { + Ok(v) => { + let r = { + // push new cache file name to lru + let mut lru_cache = lru_cache.lock().await; + lru_cache.push(cache_path.clone(), ()) + }; + // delete the evicted cache file + if let Some((k, _v)) = r { + let _ = cache.delete(&k, OpDelete::new()).await; + } + Ok(v) + } + Err(_) => return inner.read(&path, args).await, + } + } + Err(_) => return inner.read(&path, args).await, + } + }) + } + + fn on_delete( + &self, + inner: Arc, + cache: Arc, + path: &str, + args: OpDelete, + ) -> BoxFuture<'static, Result> { + let path = path.to_string(); + let lru_cache = self.lru_cache.clone(); + Box::pin(async move { + let cache_files: Vec = { + let mut guard = lru_cache.lock().await; + let lru = guard.deref_mut(); + let cache_files = lru + .iter() + .filter(|(k, _v)| k.starts_with(format!("{path}.cache-").as_str())) + .map(|(k, _v)| k.clone()) + .collect::>(); + for k in &cache_files { + lru.pop(k); + } + cache_files + }; + for file in cache_files { + let _ = cache.delete(&file, OpDelete::new()).await; + } + return inner.delete(&path, args).await; + }) + } +} diff --git a/src/object-store/src/lib.rs b/src/object-store/src/lib.rs index 1079a9f4d1db..cd2d3b1d798b 100644 --- a/src/object-store/src/lib.rs +++ b/src/object-store/src/lib.rs @@ -17,5 +17,6 @@ pub use opendal::{ Operator as ObjectStore, Result, }; pub mod backend; +pub mod cache_policy; pub mod test_util; pub mod util; diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index c52db9e10707..60a779122612 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -17,9 +17,12 @@ use std::env; use anyhow::Result; use common_telemetry::logging; use object_store::backend::{fs, s3}; +use object_store::cache_policy::LruCachePolicy; use object_store::test_util::TempFolder; use object_store::{util, Object, ObjectLister, ObjectMode, ObjectStore}; +use opendal::layers::CacheLayer; use opendal::services::oss; +use opendal::Operator; use tempdir::TempDir; async fn test_object_crud(store: &ObjectStore) -> Result<()> { @@ -160,3 +163,108 @@ async fn test_oss_backend() -> Result<()> { Ok(()) } + +async fn assert_cache_files( + store: &Operator, + file_names: &[&str], + file_contents: &[&str], +) -> Result<()> { + let o = store.object("/"); + let obs = o.list().await?; + let objects = util::collect(obs).await?; + + // compare the cache file with the expected cache file; ignore orders + for o in objects { + let position = file_names.iter().position(|&x| x == o.name()); + assert!(position.is_some(), "file not found: {}", o.name()); + + let position = position.unwrap(); + let bs = o.read().await?; + assert_eq!( + file_contents[position], + String::from_utf8(bs.clone())?, + "file content not match: {}", + o.name() + ); + } + + Ok(()) +} + +#[tokio::test] +async fn test_object_store_cache_policy() -> Result<()> { + // create file storage + let root_dir = TempDir::new("test_fs_backend")?; + let store = ObjectStore::new( + fs::Builder::default() + .root(&root_dir.path().to_string_lossy()) + .atomic_write_dir(&root_dir.path().to_string_lossy()) + .build()?, + ); + + // create file cache layer + let cache_dir = TempDir::new("test_fs_cache")?; + let cache_op = ObjectStore::new( + fs::Builder::default() + .root(&cache_dir.path().to_string_lossy()) + .atomic_write_dir(&cache_dir.path().to_string_lossy()) + .build()?, + ); + // create operator for cache dir to verify cache file + let cache_store = ObjectStore::from(cache_op.inner()); + let policy = LruCachePolicy::new(3); + let store = store.layer(CacheLayer::new(cache_op).with_policy(policy)); + + // create several object handler. + let o1 = store.object("test_file1"); + let o2 = store.object("test_file2"); + + // write data into object; + assert!(o1.write("Hello, object1!").await.is_ok()); + assert!(o2.write("Hello, object2!").await.is_ok()); + + // crate cache by read object + o1.range_read(7..).await?; + o1.read().await?; + o2.range_read(7..).await?; + o2.read().await?; + + assert_cache_files( + &cache_store, + &[ + "test_file1.cache-bytes=0-", + "test_file2.cache-bytes=7-", + "test_file2.cache-bytes=0-", + ], + &["Hello, object1!", "object2!", "Hello, object2!"], + ) + .await?; + + assert!(o2.delete().await.is_ok()); + + assert_cache_files( + &cache_store, + &["test_file1.cache-bytes=0-"], + &["Hello, object1!"], + ) + .await?; + + let o3 = store.object("test_file3"); + assert!(o3.write("Hello, object3!").await.is_ok()); + + o3.read().await?; + o3.range_read(0..5).await?; + + assert_cache_files( + &cache_store, + &[ + "test_file1.cache-bytes=0-", + "test_file3.cache-bytes=0-", + "test_file3.cache-bytes=0-4", + ], + &["Hello, object1!", "Hello, object3!", "Hello"], + ) + .await?; + + Ok(()) +} diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 978495cec796..a2de9cf268a9 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -101,6 +101,8 @@ fn get_test_store_config( access_key_secret: env::var("GT_OSS_ACCESS_KEY").unwrap(), bucket: env::var("GT_OSS_BUCKET").unwrap(), endpoint: env::var("GT_OSS_ENDPOINT").unwrap(), + cache_path: None, + cache_capacity: None, }; let accessor = oss::Builder::default() @@ -129,6 +131,8 @@ fn get_test_store_config( bucket: env::var("GT_S3_BUCKET").unwrap(), endpoint: None, region: None, + cache_path: None, + cache_capacity: None, }; let accessor = s3::Builder::default()