Skip to content

Commit

Permalink
feat: Add logic to FsDataStore for K/V storage. (#183)
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 authored Apr 19, 2024
1 parent f023d89 commit 0ebb6e6
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
- feat: Add options to disable dns transport.
- refactor: Spawn local task for idb operation instead of using channels. [PR 182](https://github.com/dariusc93/rust-ipfs/pull/182)
- fix: Improve performance by collecting the pinned blocks then compare.
- feat: Add logic to FsDataStore for K/V storage. [PR 183](https://github.com/dariusc93/rust-ipfs/pull/183)

# 0.11.6
- feat: Add RepoInsertPin::provider and RepoInsertPin::providers. [PR 180](https://github.com/dariusc93/rust-ipfs/pull/180)
Expand Down
2 changes: 1 addition & 1 deletion src/ipns/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl Ipns {

let datastore = repo.data_store();

let record_data = datastore.get(mb.as_bytes()).await?;
let record_data = datastore.get(mb.as_bytes()).await.unwrap_or_default();

let mut seq = 0;

Expand Down
8 changes: 4 additions & 4 deletions src/repo/blockstore/idb.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::{rc::Rc, str::FromStr, sync::OnceLock};

use async_trait::async_trait;
use futures::{channel::oneshot, stream::BoxStream, SinkExt, StreamExt};
use idb::{Database, DatabaseEvent, Factory, ObjectStoreParams, TransactionMode};
use libipld::Cid;
use crate::{
repo::{BlockPut, BlockStore},
Block, Error,
};
use async_trait::async_trait;
use futures::{channel::oneshot, stream::BoxStream, SinkExt, StreamExt};
use idb::{Database, DatabaseEvent, Factory, ObjectStoreParams, TransactionMode};
use libipld::Cid;
use send_wrapper::SendWrapper;
use wasm_bindgen_futures::wasm_bindgen::JsValue;

Expand Down
198 changes: 185 additions & 13 deletions src/repo/datastore/flatfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ use crate::repo::paths::{filestem_to_pin_cid, pin_path};
use crate::repo::{DataStore, PinKind, PinMode, PinModeRequirement, PinStore, References};
use async_trait::async_trait;
use core::convert::TryFrom;
use futures::stream::TryStreamExt;
use futures::stream::{BoxStream, TryStreamExt};
use futures::StreamExt;
use libipld::Cid;
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::fs;
use tokio::sync::Semaphore;
use tokio::sync::{RwLock, Semaphore};
use tokio_stream::{empty, wrappers::ReadDirStream};
use tokio_util::either::Either;

Expand All @@ -34,15 +34,145 @@ pub struct FsDataStore {
/// collection implementation, it might be needed to hold this permit for the duration of
/// garbage collection, or something similar.
lock: Arc<Semaphore>,

ds_guard: Arc<RwLock<()>>,
}

impl FsDataStore {
pub fn new(root: PathBuf) -> Self {
FsDataStore {
path: root,
ds_guard: Arc::default(),
lock: Arc::new(Semaphore::new(1)),
}
}

// Instead of having the file be the key itself, we would split the key into segements with all but the last representing as a directory
// with the final item being a file.
fn key(&self, key: &[u8]) -> Option<(String, String)> {
let key = String::from_utf8_lossy(key);
let mut key_segments = key.split('/').collect::<Vec<_>>();

let key_val = key_segments
.pop()
.map(PathBuf::from)
.map(|path| path.with_extension("data"))
.map(|path| path.to_string_lossy().to_string())?;

let key_path_raw = key_segments.join("/");

let key_path = match key_path_raw.starts_with('/') {
true => key_path_raw[1..].to_string(),
false => key_path_raw,
};

Some((key_path, key_val))
}

async fn write(&self, key: &[u8], val: &[u8]) -> std::io::Result<()> {
let data_path = self.path.join("data");
if !data_path.is_dir() {
tokio::fs::create_dir_all(&data_path).await?;
}

let (path, key) = self
.key(key)
.ok_or::<std::io::Error>(std::io::ErrorKind::NotFound.into())?;

let path = data_path.join(path);

if !path.is_dir() {
tokio::fs::create_dir_all(&path).await?;
}

let path = path.join(key);

if path.is_dir() {
// The only reason why this would be a directory is if the key didnt exist, is invalid, or the item was
// actually a directory in which case we return an error here.
return Err(std::io::ErrorKind::Other.into());
}

tokio::fs::write(path, val).await
}

fn _contains(&self, key: &[u8]) -> bool {
let data_path = self.path.join("data");
let Some((path, key)) = self.key(key) else {
return false;
};
let path = data_path.join(path);
let path = path.join(key);
path.is_file()
}

async fn delete(&self, key: &[u8]) -> std::io::Result<()> {
let data_path = self.path.join("data");
let (path, key) = self
.key(key)
.ok_or::<std::io::Error>(std::io::ErrorKind::NotFound.into())?;
let path = data_path.join(path);
let path = path.join(key);
tokio::fs::remove_file(path).await
}

async fn read(&self, key: &[u8]) -> std::io::Result<Option<Vec<u8>>> {
let data_path = self.path.join("data");
let (path, key) = self
.key(key)
.ok_or::<std::io::Error>(std::io::ErrorKind::NotFound.into())?;
let path = data_path.join(path);
let path = path.join(key);
if path.is_dir() {
return Ok(None);
}
tokio::fs::read(path).await.map(Some)
}
}

fn build_kv<R: AsRef<Path>, P: AsRef<Path>>(
data_path: R,
path: P,
) -> BoxStream<'static, (Vec<u8>, Vec<u8>)> {
let data_path = data_path.as_ref().to_path_buf();
let path = path.as_ref().to_path_buf();
async_stream::stream! {
if path.is_file() {
return;
}
let Ok(dir) = tokio::fs::read_dir(path).await else {
return;
};

let st =
ReadDirStream::new(dir).filter_map(|result| futures::future::ready(result.ok()));

for await entry in st {
let path = entry.path();
if path.is_dir() {
for await item in build_kv(&data_path, &path) {
yield item;
}
} else {
let root_str = data_path.to_string_lossy().to_string();
let path_str = path.to_string_lossy().to_string();
let raw_key = &path_str[root_str.len()..];
if raw_key.is_empty() {
continue;
}

let Some(key) = raw_key.get(0..raw_key.len() - 5) else {
continue;
};

if let Ok(bytes) = tokio::fs::read(path).await {
let key = key.as_bytes().to_vec();
yield (key, bytes)
}
}
}
}
.boxed()
}

/// The column operations are all unimplemented pending at least downscoping of the
Expand All @@ -52,31 +182,37 @@ impl DataStore for FsDataStore {
async fn init(&self) -> Result<(), Error> {
// Although `pins` directory is created when inserting a data, is it not created when there are any attempts at listing the pins (thus causing to fail)
tokio::fs::create_dir_all(&self.path.join("pins")).await?;
tokio::fs::create_dir_all(&self.path.join("data")).await?;
Ok(())
}

async fn open(&self) -> Result<(), Error> {
Ok(())
}

async fn contains(&self, _key: &[u8]) -> Result<bool, Error> {
Err(anyhow::anyhow!("not implemented"))
async fn contains(&self, key: &[u8]) -> Result<bool, Error> {
let _g = self.ds_guard.read().await;
Ok(self._contains(key))
}

async fn get(&self, _key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
Err(anyhow::anyhow!("not implemented"))
async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
let _g = self.ds_guard.read().await;
self.read(key).await.map_err(Error::from)
}

async fn put(&self, _key: &[u8], _value: &[u8]) -> Result<(), Error> {
Err(anyhow::anyhow!("not implemented"))
async fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> {
let _g = self.ds_guard.write().await;
self.write(key, value).await.map_err(Error::from)
}

async fn remove(&self, _key: &[u8]) -> Result<(), Error> {
Err(anyhow::anyhow!("not implemented"))
async fn remove(&self, key: &[u8]) -> Result<(), Error> {
let _g = self.ds_guard.write().await;
self.delete(key).await.map_err(Error::from)
}

async fn iter(&self) -> futures::stream::BoxStream<'static, (Vec<u8>, Vec<u8>)> {
futures::stream::empty().boxed()
async fn iter(&self) -> BoxStream<'static, (Vec<u8>, Vec<u8>)> {
let data_path = self.path.join("data");
build_kv(&data_path, &data_path)
}
}

Expand Down Expand Up @@ -631,3 +767,39 @@ crate::pinstore_interface_tests!(
common_tests,
crate::repo::datastore::flatfs::FsDataStore::new
);

#[cfg(test)]
mod test {
use crate::repo::{datastore::flatfs::FsDataStore, DataStore};

#[tokio::test]
async fn test_kv_datastore() -> anyhow::Result<()> {
let tmp = std::env::temp_dir();
let store = FsDataStore::new(tmp.clone());
let key = [1, 2, 3, 4];
let value = [5, 6, 7, 8];

store.init().await?;
store.open().await?;

let contains = store.contains(&key).await.unwrap();
assert!(!contains);
let get = store.get(&key).await.unwrap_or_default();
assert_eq!(get, None);
assert!(store.remove(&key).await.is_err());

store.put(&key, &value).await.unwrap();
let contains = store.contains(&key).await.unwrap();
assert!(contains);
let get = store.get(&key).await.unwrap();
assert_eq!(get, Some(value.to_vec()));

store.remove(&key).await.unwrap();
let contains = store.contains(&key).await.unwrap();
assert!(!contains);
let get = store.get(&key).await.unwrap_or_default();
assert_eq!(get, None);
drop(store);
Ok(())
}
}

0 comments on commit 0ebb6e6

Please sign in to comment.