Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable caching when using object store #928

Merged
merged 6 commits into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use crate::error::Result;
use crate::instance::{Instance, InstanceRef};
use crate::server::Services;

pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize(1024);

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum ObjectStoreConfig {
Expand All @@ -48,6 +50,8 @@ pub struct S3Config {
pub secret_access_key: String,
pub endpoint: Option<String>,
pub region: Option<String>,
pub cache_path: Option<String>,
pub cache_capacity: Option<ReadableSize>,
}

#[derive(Debug, Clone, Serialize, Default, Deserialize)]
Expand All @@ -58,6 +62,8 @@ pub struct OssConfig {
pub access_key_id: String,
pub access_key_secret: String,
pub endpoint: String,
pub cache_path: Option<String>,
pub cache_capacity: Option<ReadableSize>,
}

impl Default for ObjectStoreConfig {
Expand Down
49 changes: 45 additions & 4 deletions src/datanode/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::{fs, path};
use backon::ExponentialBackoff;
use catalog::remote::MetaKvBackend;
use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest};
use common_base::readable_size::ReadableSize;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_telemetry::logging::info;
Expand All @@ -28,7 +29,8 @@ use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_client::MetaClientOpts;
use mito::config::EngineConfig as TableEngineConfig;
use mito::engine::MitoEngine;
use object_store::layers::{LoggingLayer, MetricsLayer, RetryLayer, TracingLayer};
use object_store::cache_policy::LruCachePolicy;
use object_store::layers::{CacheLayer, LoggingLayer, MetricsLayer, RetryLayer, TracingLayer};
use object_store::services::fs::Builder as FsBuilder;
use object_store::services::oss::Builder as OSSBuilder;
use object_store::services::s3::Builder as S3Builder;
Expand All @@ -42,7 +44,9 @@ use table::table::numbers::NumbersTable;
use table::table::TableIdProviderRef;
use table::Table;

use crate::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig};
use crate::datanode::{
DatanodeOptions, ObjectStoreConfig, WalConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE,
};
use crate::error::{
self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu,
NewCatalogSnafu, OpenLogStoreSnafu, Result,
Expand Down Expand Up @@ -240,7 +244,44 @@ pub(crate) async fn new_oss_object_store(store_config: &ObjectStoreConfig) -> Re
config: store_config.clone(),
})?;

Ok(ObjectStore::new(accessor))
create_object_store_with_cache(ObjectStore::new(accessor), store_config)
}

fn create_object_store_with_cache(
object_store: ObjectStore,
store_config: &ObjectStoreConfig,
) -> Result<ObjectStore> {
let (cache_path, cache_capacity) = match store_config {
ObjectStoreConfig::S3(s3_config) => {
let path = s3_config.cache_path.as_ref();
let capacity = s3_config
.cache_capacity
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
(path, capacity)
}
ObjectStoreConfig::Oss(oss_config) => {
let path = oss_config.cache_path.as_ref();
let capacity = oss_config
.cache_capacity
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
(path, capacity)
}
_ => (None, ReadableSize(0)),
};

if let Some(path) = cache_path {
let cache_store =
ObjectStore::new(FsBuilder::default().root(path).build().with_context(|_| {
error::InitBackendSnafu {
config: store_config.clone(),
}
})?);
let policy = LruCachePolicy::new(cache_capacity.0 as usize);
let cache_layer = CacheLayer::new(cache_store).with_policy(policy);
Ok(object_store.layer(cache_layer))
} else {
Ok(object_store)
}
}

pub(crate) async fn new_s3_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
Expand Down Expand Up @@ -273,7 +314,7 @@ pub(crate) async fn new_s3_object_store(store_config: &ObjectStoreConfig) -> Res
config: store_config.clone(),
})?;

Ok(ObjectStore::new(accessor))
create_object_store_with_cache(ObjectStore::new(accessor), store_config)
}

pub(crate) async fn new_fs_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
Expand Down
2 changes: 2 additions & 0 deletions src/object-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ edition.workspace = true
license.workspace = true

[dependencies]
lru = "0.9"
async-trait = "0.1"
futures = { version = "0.3" }
opendal = { version = "0.25.1", features = [
"layers-tracing",
Expand Down
123 changes: 123 additions & 0 deletions src/object-store/src/cache_policy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::num::NonZeroUsize;
use std::ops::DerefMut;
use std::sync::Arc;

use async_trait::async_trait;
use futures::future::BoxFuture;
use lru::LruCache;
use opendal::layers::CachePolicy;
use opendal::raw::output::Reader;
use opendal::raw::{Accessor, RpDelete, RpRead};
use opendal::{ErrorKind, OpDelete, OpRead, OpWrite, Result};
use tokio::sync::Mutex;

#[derive(Debug)]
pub struct LruCachePolicy {
lru_cache: Arc<Mutex<LruCache<String, ()>>>,
}

impl LruCachePolicy {
pub fn new(capacity: usize) -> Self {
Self {
lru_cache: Arc::new(Mutex::new(LruCache::new(
NonZeroUsize::new(capacity).unwrap(),
))),
}
}

fn cache_path(&self, path: &str, args: &OpRead) -> String {
format!("{}.cache-{}", path, args.range().to_header())
}
}

#[async_trait]
impl CachePolicy for LruCachePolicy {
fn on_read(
&self,
inner: Arc<dyn Accessor>,
cache: Arc<dyn Accessor>,
path: &str,
args: OpRead,
) -> BoxFuture<'static, Result<(RpRead, Reader)>> {
let path = path.to_string();
let cache_path = self.cache_path(&path, &args);
let lru_cache = self.lru_cache.clone();
Box::pin(async move {
match cache.read(&cache_path, OpRead::default()).await {
Ok(v) => {
// update lru when cache hit
let mut lru_cache = lru_cache.lock().await;
lru_cache.get_or_insert(cache_path.clone(), || ());
Ok(v)
}
Err(err) if err.kind() == ErrorKind::ObjectNotFound => {
let (rp, reader) = inner.read(&path, args.clone()).await?;
let size = rp.clone().into_metadata().content_length();
let _ = cache
.write(&cache_path, OpWrite::new(size), Box::new(reader))
.await?;
match cache.read(&cache_path, OpRead::default()).await {
Ok(v) => {
let r = {
// push new cache file name to lru
let mut lru_cache = lru_cache.lock().await;
lru_cache.push(cache_path.clone(), ())
};
// delete the evicted cache file
if let Some((k, _v)) = r {
let _ = cache.delete(&k, OpDelete::new()).await;
}
Ok(v)
}
Err(_) => return inner.read(&path, args).await,
}
}
Err(_) => return inner.read(&path, args).await,
}
})
}

fn on_delete(
&self,
inner: Arc<dyn Accessor>,
cache: Arc<dyn Accessor>,
path: &str,
args: OpDelete,
) -> BoxFuture<'static, Result<RpDelete>> {
let path = path.to_string();
let lru_cache = self.lru_cache.clone();
Box::pin(async move {
let cache_files: Vec<String> = {
let mut guard = lru_cache.lock().await;
let lru = guard.deref_mut();
let cache_files = lru
.iter()
.filter(|(k, _v)| k.starts_with(format!("{path}.cache-").as_str()))
.map(|(k, _v)| k.clone())
.collect::<Vec<_>>();
for k in &cache_files {
lru.pop(k);
}
cache_files
};
for file in cache_files {
let _ = cache.delete(&file, OpDelete::new()).await;
}
return inner.delete(&path, args).await;
})
}
}
1 change: 1 addition & 0 deletions src/object-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ pub use opendal::{
Operator as ObjectStore, Result,
};
pub mod backend;
pub mod cache_policy;
pub mod test_util;
pub mod util;
108 changes: 108 additions & 0 deletions src/object-store/tests/object_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ use std::env;
use anyhow::Result;
use common_telemetry::logging;
use object_store::backend::{fs, s3};
use object_store::cache_policy::LruCachePolicy;
use object_store::test_util::TempFolder;
use object_store::{util, Object, ObjectLister, ObjectMode, ObjectStore};
use opendal::layers::CacheLayer;
use opendal::services::oss;
use opendal::Operator;
use tempdir::TempDir;

async fn test_object_crud(store: &ObjectStore) -> Result<()> {
Expand Down Expand Up @@ -160,3 +163,108 @@ async fn test_oss_backend() -> Result<()> {

Ok(())
}

async fn assert_cache_files(
store: &Operator,
file_names: &[&str],
file_contents: &[&str],
) -> Result<()> {
let o = store.object("/");
let obs = o.list().await?;
let objects = util::collect(obs).await?;

// compare the cache file with the expected cache file; ignore orders
for o in objects {
let position = file_names.iter().position(|&x| x == o.name());
assert!(position.is_some(), "file not found: {}", o.name());

let position = position.unwrap();
let bs = o.read().await?;
assert_eq!(
file_contents[position],
String::from_utf8(bs.clone())?,
"file content not match: {}",
o.name()
);
}

Ok(())
}

#[tokio::test]
async fn test_object_store_cache_policy() -> Result<()> {
// create file storage
let root_dir = TempDir::new("test_fs_backend")?;
let store = ObjectStore::new(
fs::Builder::default()
.root(&root_dir.path().to_string_lossy())
.atomic_write_dir(&root_dir.path().to_string_lossy())
.build()?,
);

// create file cache layer
let cache_dir = TempDir::new("test_fs_cache")?;
let cache_op = ObjectStore::new(
fs::Builder::default()
.root(&cache_dir.path().to_string_lossy())
.atomic_write_dir(&cache_dir.path().to_string_lossy())
.build()?,
);
// create operator for cache dir to verify cache file
let cache_store = ObjectStore::from(cache_op.inner());
let policy = LruCachePolicy::new(3);
let store = store.layer(CacheLayer::new(cache_op).with_policy(policy));

// create several object handler.
let o1 = store.object("test_file1");
let o2 = store.object("test_file2");

// write data into object;
assert!(o1.write("Hello, object1!").await.is_ok());
assert!(o2.write("Hello, object2!").await.is_ok());

// crate cache by read object
o1.range_read(7..).await?;
o1.read().await?;
o2.range_read(7..).await?;
o2.read().await?;

assert_cache_files(
&cache_store,
&[
"test_file1.cache-bytes=0-",
"test_file2.cache-bytes=7-",
"test_file2.cache-bytes=0-",
],
&["Hello, object1!", "object2!", "Hello, object2!"],
)
.await?;

assert!(o2.delete().await.is_ok());

assert_cache_files(
&cache_store,
&["test_file1.cache-bytes=0-"],
&["Hello, object1!"],
)
.await?;

let o3 = store.object("test_file3");
assert!(o3.write("Hello, object3!").await.is_ok());

o3.read().await?;
o3.range_read(0..5).await?;

assert_cache_files(
&cache_store,
&[
"test_file1.cache-bytes=0-",
"test_file3.cache-bytes=0-",
"test_file3.cache-bytes=0-4",
],
&["Hello, object1!", "Hello, object3!", "Hello"],
)
.await?;

Ok(())
}
Loading