Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement path cache and refactor gdrive #3975

Merged
merged 26 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ native-tls = ["reqwest/native-tls"]
# Enable vendored native-tls for TLS support
native-tls-vendored = ["reqwest/native-tls-vendored"]

# Enable path cache.
# This is an internal feature, and should not be used by users.
internal-path-cache = ["dep:moka"]

# Enable all layers.
layers-all = [
"layers-chaos",
Expand Down Expand Up @@ -142,7 +146,7 @@ services-gcs = [
"reqsign?/services-google",
"reqsign?/reqwest_request",
]
services-gdrive = []
services-gdrive = ["internal-path-cache"]
services-ghac = []
services-gridfs = ["dep:mongodb"]
services-hdfs = ["dep:hdrs"]
Expand Down
5 changes: 5 additions & 0 deletions core/src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ pub use layer::*;
mod path;
pub use path::*;

#[cfg(feature = "internal-path-cache")]
mod path_cache;
#[cfg(feature = "internal-path-cache")]
pub use path_cache::*;

mod operation;
pub use operation::*;

Expand Down
2 changes: 1 addition & 1 deletion core/src/raw/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::EntryMode;
use crate::*;

/// build_abs_path will build an absolute path with root.
///
Expand Down
144 changes: 144 additions & 0 deletions core/src/raw/path_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::raw::{get_basename, get_parent};
use crate::*;
use async_trait::async_trait;
use moka::sync::Cache;

/// The trait required for path cacher.
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait PathQuery {
/// Fetch the id for the root of the service.
async fn root(&self) -> Result<String>;
/// Query the id by parent_id and name.
async fn query(&self, parent_id: &str, name: &str) -> Result<Option<String>>;
}

/// PathCacher is a cache for path query.
///
/// OpenDAL is designed for path based storage systemds, such as S3, HDFS, etc. But there are many
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
/// services that are not path based, such as OneDrive, Google Drive, etc. For these services, we
/// lookup files based on id. The lookup of id is very expensive, so we cache the path to id mapping
/// in PathCacher.
///
/// # Behavior
///
/// The `path` in cache is always an absolute. For example, if service root is `/root/`, then the
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
/// path of file `a/b` in cache will be `/root/a/b`.
pub struct PathCacher<Q: PathQuery> {
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
query: Q,
cache: Cache<String, String>,
}

impl<Q: PathQuery> PathCacher<Q> {
/// Create a new path cacher.
pub fn new(query: Q) -> Self {
Self {
query,
cache: Cache::new(64 * 1024),
}
}

/// Insert a new cache entry.
pub fn insert(&self, path: &str, id: &str) {
self.cache.insert(path.to_string(), id.to_string());
}

/// Remove a cache entry.
pub fn remove(&self, path: &str) {
self.cache.invalidate(path)
}

/// Get the id for the given path.
pub async fn get(&self, path: &str) -> Result<Option<String>> {
if let Some(id) = self.cache.get(path) {
return Ok(Some(id));
}

let mut paths = vec![];
let mut current_path = path;

while current_path != "/" && !current_path.is_empty() {
paths.push(current_path.to_string());
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
current_path = get_parent(current_path);
if let Some(id) = self.cache.get(current_path) {
return self.query_down(&id, &paths).await;
}
}

let root_id = self.query.root().await?;
self.query_down(&root_id, &paths).await
}

async fn query_down(&self, start_id: &str, paths: &[String]) -> Result<Option<String>> {
let mut current_id = start_id.to_string();
for path in paths.iter().rev() {
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
let name = get_basename(path);
current_id = match self.query.query(&current_id, name).await? {
Some(id) => {
self.cache.insert(path.clone(), id.clone());
id
}
None => return Ok(None),
};
}
Ok(Some(current_id))
}
}

#[cfg(test)]
mod tests {
use crate::raw::{PathCacher, PathQuery};
use crate::*;
use async_trait::async_trait;

struct TestQuery {}

#[async_trait]
impl PathQuery for TestQuery {
async fn root(&self) -> Result<String> {
Ok("root/".to_string())
}

async fn query(&self, parent_id: &str, name: &str) -> Result<Option<String>> {
if name.starts_with("not_exist") {
return Ok(None);
}
Ok(Some(format!("{parent_id}{name}")))
}
}

#[tokio::test]
async fn test_path_cacher_get() {
let cases = vec![
("root", "/", Some("root/")),
("normal path", "/a", Some("root/a")),
("not exist normal dir", "/not_exist/a", None),
("not exist normal file", "/a/b/not_exist", None),
("nest path", "/a/b/c/d", Some("root/a/b/c/d")),
];

for (name, input, expect) in cases {
let cache = PathCacher::new(TestQuery {});

let actual = cache.get(input).await.unwrap();
assert_eq!(actual.as_deref(), expect, "{}", name)
}
}
}
99 changes: 27 additions & 72 deletions core/src/services/gdrive/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,7 @@ impl Accessor for GdriveBackend {
}

async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
let parent = self.core.ensure_parent_path(path).await?;

// Make sure `/` has been trimmed.
let path = get_basename(path).trim_end_matches('/');

// As Google Drive allows files have the same name, we need to check if the folder exists.
let folder_id = self.core.gdrive_search_folder(&parent, path).await?;

let id = if let Some(id) = folder_id {
id
} else {
self.core.gdrive_create_folder(&parent, path).await?
};

let mut cache = self.core.path_cache.lock().await;
cache.insert(build_abs_path(&self.core.root, path), id);
let _ = self.core.ensure_parent_path(path).await?;

Ok(RpCreateDir::default())
}
Expand Down Expand Up @@ -161,73 +146,43 @@ impl Accessor for GdriveBackend {
}

async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
let resp = self.core.gdrive_delete(path).await;
if let Ok(resp) = resp {
let status = resp.status();

match status {
StatusCode::NO_CONTENT | StatusCode::NOT_FOUND => {
let mut cache = self.core.path_cache.lock().await;

cache.remove(&build_abs_path(&self.core.root, path));

return Ok(RpDelete::default());
}
_ => return Err(parse_error(resp).await?),
}
let path = build_abs_path(&self.core.root, path);
let file_id = self.core.path_cache.get(&path).await?;
let file_id = if let Some(id) = file_id {
id
} else {
return Ok(RpDelete::default());
};

let e = resp.err().unwrap();
if e.kind() == ErrorKind::NotFound {
Ok(RpDelete::default())
} else {
Err(e)
let resp = self.core.gdrive_delete(&file_id).await?;
let status = resp.status();
if status != StatusCode::NO_CONTENT && status != StatusCode::NOT_FOUND {
return Err(parse_error(resp).await?);
}

self.core.path_cache.remove(&path);
resp.into_body().consume().await?;
return Ok(RpDelete::default());
}

async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> {
let l = GdriveLister::new(path.into(), self.core.clone());
let path = build_abs_path(&self.core.root, path);
let l = GdriveLister::new(path, self.core.clone());
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
Ok((RpList::default(), oio::PageLister::new(l)))
}

async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
let from_file_id = self
.core
.get_file_id_by_path(from)
.await?
.ok_or(Error::new(ErrorKind::NotFound, "invalid 'from' path"))?;

// split `to` into parent and name according to the last `/`
let mut to_path_items: Vec<&str> = to.split('/').filter(|&x| !x.is_empty()).collect();

let to_name = if let Some(name) = to_path_items.pop() {
name
} else {
return Err(Error::new(ErrorKind::InvalidInput, "invalid 'to' path"));
};
let from = build_abs_path(&self.core.root, from);
let from_file_id = self.core.path_cache.get(&from).await?.ok_or(Error::new(
ErrorKind::NotFound,
"the file to be copied is not exist",
))?;

let to_parent = to_path_items.join("/") + "/";

let to_parent_id =
if let Some(id) = self.core.get_file_id_by_path(to_parent.as_str()).await? {
id
} else {
self.create_dir(&to_parent, OpCreateDir::new()).await?;
self.core
.get_file_id_by_path(to_parent.as_str())
.await?
.ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "create to's parent folder failed")
})?
};
let to_name = get_basename(to);
let to_parent_id = self.core.ensure_parent_path(to).await?;

// copy will overwrite `to`, delete it if exist
if self
.core
.get_file_id_by_path(to)
.await
.is_ok_and(|id| id.is_some())
{
if self.core.path_cache.get(to).await?.is_some() {
self.delete(to, OpDelete::new()).await?;
}

Expand Down Expand Up @@ -266,10 +221,10 @@ impl Accessor for GdriveBackend {
let meta = serde_json::from_slice::<GdriveFile>(&body)
.map_err(new_json_deserialize_error)?;

let mut cache = self.core.path_cache.lock().await;
let cache = &self.core.path_cache;

cache.remove(&build_abs_path(&self.core.root, from));
cache.insert(build_abs_path(&self.core.root, to), meta.id.clone());
cache.insert(&build_abs_path(&self.core.root, to), &meta.id);

Ok(RpRename::default())
}
Expand Down
33 changes: 16 additions & 17 deletions core/src/services/gdrive/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ use log::debug;
use tokio::sync::Mutex;

use super::backend::GdriveBackend;
use crate::raw::normalize_root;
use crate::raw::HttpClient;
use crate::services::gdrive::core::GdriveCore;
use crate::raw::{normalize_root, PathCacher};
use crate::services::gdrive::core::GdriveSigner;
use crate::services::gdrive::core::{GdriveCore, GdrivePathQuery};
use crate::Scheme;
use crate::*;

Expand Down Expand Up @@ -144,13 +144,13 @@ impl Builder for GdriveBuilder {
})?
};

let signer = match (self.access_token.take(), self.refresh_token.take()) {
(Some(access_token), None) => GdriveSigner {
access_token,
let mut signer = GdriveSigner::new(client.clone());
match (self.access_token.take(), self.refresh_token.take()) {
(Some(access_token), None) => {
signer.access_token = access_token;
// We will never expire user specified access token.
expires_in: DateTime::<Utc>::MAX_UTC,
..Default::default()
},
signer.expires_in = DateTime::<Utc>::MAX_UTC;
}
(None, Some(refresh_token)) => {
let client_id = self.client_id.take().ok_or_else(|| {
Error::new(
Expand All @@ -167,12 +167,10 @@ impl Builder for GdriveBuilder {
.with_context("service", Scheme::Gdrive)
})?;

GdriveSigner {
refresh_token,
client_id,
client_secret,
..Default::default()
}
signer.refresh_token = refresh_token;
signer.client = client.clone();
signer.client_id = client_id;
signer.client_secret = client_secret;
}
(Some(_), Some(_)) => {
return Err(Error::new(
Expand All @@ -190,12 +188,13 @@ impl Builder for GdriveBuilder {
}
};

let signer = Arc::new(Mutex::new(signer));
Ok(GdriveBackend {
core: Arc::new(GdriveCore {
root,
signer: Arc::new(Mutex::new(signer)),
client,
path_cache: Arc::default(),
signer: signer.clone(),
client: client.clone(),
path_cache: PathCacher::new(GdrivePathQuery::new(client, signer)),
}),
})
}
Expand Down
Loading
Loading