Skip to content

Commit

Permalink
fix: remove path label for cache store (#4336)
Browse files Browse the repository at this point in the history
* fix: remove path label for cache store

* fix: ignore path label for intermediatemanager

* refactor: remove unused object store
  • Loading branch information
sunng87 authored Jul 15, 2024
1 parent 4b8b04f commit b8bd845
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 29 deletions.
2 changes: 1 addition & 1 deletion src/common/datasource/src/object_store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub fn build_fs_backend(root: &str) -> Result<ObjectStore> {
.expect("input error level must be valid"),
)
.layer(object_store::layers::TracingLayer)
.layer(object_store::layers::PrometheusMetricsLayer)
.layer(object_store::layers::PrometheusMetricsLayer::new(true))
.finish();
Ok(object_store)
}
2 changes: 1 addition & 1 deletion src/common/datasource/src/object_store/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub fn build_s3_backend(
.expect("input error level must be valid"),
)
.layer(object_store::layers::TracingLayer)
.layer(object_store::layers::PrometheusMetricsLayer)
.layer(object_store::layers::PrometheusMetricsLayer::new(true))
.finish())
}

Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub(crate) async fn new_object_store(
object_store
};

let store = with_instrument_layers(object_store);
let store = with_instrument_layers(object_store, true);
Ok(store)
}

Expand Down
9 changes: 3 additions & 6 deletions src/mito2/src/access_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,18 +208,15 @@ pub(crate) struct SstWriteRequest {
pub(crate) fulltext_index_config: FulltextIndexConfig,
}

/// Creates a fs object store with atomic write dir.
pub(crate) async fn new_fs_object_store(root: &str) -> Result<ObjectStore> {
pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {
let atomic_write_dir = join_dir(root, ".tmp/");
clean_dir(&atomic_write_dir).await?;

let mut builder = Fs::default();
builder.root(root).atomic_write_dir(&atomic_write_dir);
let object_store = ObjectStore::new(builder).context(OpenDalSnafu)?.finish();
let store = ObjectStore::new(builder).context(OpenDalSnafu)?.finish();

// Add layers.
let object_store = with_instrument_layers(object_store);
Ok(object_store)
Ok(with_instrument_layers(store, false))
}

/// Clean the directory.
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use object_store::manager::ObjectStoreManagerRef;
use object_store::ObjectStore;
use snafu::ResultExt;

use crate::access_layer::{new_fs_object_store, SstWriteRequest};
use crate::access_layer::{new_fs_cache_store, SstWriteRequest};
use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
use crate::error::{self, Result};
use crate::metrics::{FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL};
Expand Down Expand Up @@ -86,7 +86,7 @@ impl WriteCache {
) -> Result<Self> {
info!("Init write cache on {cache_dir}, capacity: {cache_capacity}");

let local_store = new_fs_object_store(cache_dir).await?;
let local_store = new_fs_cache_store(cache_dir).await?;
Self::new(
local_store,
object_store_manager,
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/sst/index/intermediate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use object_store::util::{self, normalize_dir};
use store_api::storage::{ColumnId, RegionId};
use uuid::Uuid;

use crate::access_layer::new_fs_object_store;
use crate::access_layer::new_fs_cache_store;
use crate::error::Result;
use crate::sst::file::FileId;
use crate::sst::index::store::InstrumentedStore;
Expand All @@ -37,7 +37,7 @@ impl IntermediateManager {
/// Create a new `IntermediateManager` with the given root path.
/// It will clean up all garbage intermediate files from previous runs.
pub async fn init_fs(aux_path: impl AsRef<str>) -> Result<Self> {
let store = new_fs_object_store(&normalize_dir(aux_path.as_ref())).await?;
let store = new_fs_cache_store(&normalize_dir(aux_path.as_ref())).await?;
let store = InstrumentedStore::new(store);

// Remove all garbage intermediate files from previous runs.
Expand Down
48 changes: 34 additions & 14 deletions src/object-store/src/layers/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,15 @@ fn increment_errors_total(op: Operation, kind: ErrorKind) {
///
/// The metric buckets for these histograms are automatically generated based on the `exponential_buckets(0.01, 2.0, 16)` configuration.
#[derive(Default, Debug, Clone)]
pub struct PrometheusMetricsLayer;
pub struct PrometheusMetricsLayer {
pub path_label: bool,
}

impl PrometheusMetricsLayer {
pub fn new(path_label: bool) -> Self {
Self { path_label }
}
}

impl<A: Access> Layer<A> for PrometheusMetricsLayer {
type LayeredAccess = PrometheusAccess<A>;
Expand All @@ -96,6 +104,7 @@ impl<A: Access> Layer<A> for PrometheusMetricsLayer {
PrometheusAccess {
inner,
scheme: scheme.to_string(),
path_label: self.path_label,
}
}
}
Expand All @@ -104,6 +113,17 @@ impl<A: Access> Layer<A> for PrometheusMetricsLayer {
pub struct PrometheusAccess<A: Access> {
inner: A,
scheme: String,
path_label: bool,
}

impl<A: Access> PrometheusAccess<A> {
fn get_path_label<'a>(&self, path: &'a str) -> &'a str {
if self.path_label {
extract_parent_path(path)
} else {
""
}
}
}

impl<A: Access> Debug for PrometheusAccess<A> {
Expand All @@ -128,7 +148,7 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}

async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
let path_label = extract_parent_path(path);
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::CreateDir.into_static(), path_label])
.inc();
Expand All @@ -146,7 +166,7 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}

async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
let path_label = extract_parent_path(path);
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::Read.into_static(), path_label])
.inc();
Expand Down Expand Up @@ -176,7 +196,7 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let path_label = extract_parent_path(path);
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::Write.into_static(), path_label])
.inc();
Expand Down Expand Up @@ -206,7 +226,7 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}

async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
let path_label = extract_parent_path(path);
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::Stat.into_static(), path_label])
.inc();
Expand All @@ -223,7 +243,7 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}

async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
let path_label = extract_parent_path(path);
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::Delete.into_static(), path_label])
.inc();
Expand All @@ -241,7 +261,7 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
let path_label = extract_parent_path(path);
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::List.into_static(), path_label])
.inc();
Expand Down Expand Up @@ -277,7 +297,7 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}

async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
let path_label = extract_parent_path(path);
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::Presign.into_static(), path_label])
.inc();
Expand All @@ -295,7 +315,7 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}

fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
let path_label = extract_parent_path(path);
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[
&self.scheme,
Expand All @@ -322,7 +342,7 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}

fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
let path_label = extract_parent_path(path);
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[
&self.scheme,
Expand Down Expand Up @@ -363,7 +383,7 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}

fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
let path_label = extract_parent_path(path);
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[
&self.scheme,
Expand Down Expand Up @@ -404,7 +424,7 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}

fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
let path_label = extract_parent_path(path);
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[
&self.scheme,
Expand All @@ -429,7 +449,7 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}

fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
let path_label = extract_parent_path(path);
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[
&self.scheme,
Expand All @@ -455,7 +475,7 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}

fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
let path_label = extract_parent_path(path);
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[
&self.scheme,
Expand Down
4 changes: 2 additions & 2 deletions src/object-store/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ pub(crate) fn extract_parent_path(path: &str) -> &str {
}

/// Attaches instrument layers to the object store.
pub fn with_instrument_layers(object_store: ObjectStore) -> ObjectStore {
pub fn with_instrument_layers(object_store: ObjectStore, path_label: bool) -> ObjectStore {
object_store
.layer(
LoggingLayer::default()
Expand All @@ -148,7 +148,7 @@ pub fn with_instrument_layers(object_store: ObjectStore) -> ObjectStore {
.expect("input error level must be valid"),
)
.layer(TracingLayer)
.layer(PrometheusMetricsLayer)
.layer(PrometheusMetricsLayer::new(path_label))
}

#[cfg(test)]
Expand Down

0 comments on commit b8bd845

Please sign in to comment.