Skip to content

Commit

Permalink
[feature] persistent layer implementation
Browse files Browse the repository at this point in the history
- Refactor WriteOperation implementation
- add a default storage setting
- add error translation method

Close #183
  • Loading branch information
Phoenix500526 committed Mar 7, 2023
1 parent ecfeb8a commit 7dbf926
Show file tree
Hide file tree
Showing 14 changed files with 111 additions and 127 deletions.
81 changes: 31 additions & 50 deletions engine/src/engine_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,71 +5,52 @@ use crate::error::EngineError;
#[derive(Debug)]
pub enum WriteOperation<'a> {
/// `Put` operation
Put(Put<'a>),
Put {
/// The table name
table: &'a str,
/// Key
key: Vec<u8>,
/// Value
value: Vec<u8>,
},
/// `Delete` operation
Delete(Delete<'a>),
/// `DeleteRange` operation
DeleteRange(DeleteRange<'a>),
Delete {
/// The table name
table: &'a str,
/// The target key
key: &'a [u8],
},
/// Delete range operation, it will remove the database entries in the range [from, to)
DeleteRange {
/// The table name
table: &'a str,
/// The `from` key
from: &'a [u8],
/// The `to` key
to: &'a [u8],
},
}

/// Put operation
#[derive(Debug)]
pub struct Put<'a> {
/// The table name
pub(crate) table: &'a str,
/// Key
pub(crate) key: Vec<u8>,
/// Value
pub(crate) value: Vec<u8>,
}

impl<'a> Put<'a> {
impl<'a> WriteOperation<'a> {
/// Create a new `Put` operation
#[inline]
#[must_use]
pub fn new(table: &'a str, key: Vec<u8>, value: Vec<u8>) -> Put<'a> {
Put { table, key, value }
pub fn new_put(table: &'a str, key: Vec<u8>, value: Vec<u8>) -> Self {
Self::Put { table, key, value }
}
}

/// Delete operation,
#[allow(dead_code)]
#[derive(Debug)]
pub struct Delete<'a> {
/// The table name
pub(crate) table: &'a str,
/// The target key
pub(crate) key: &'a [u8],
}

impl<'a> Delete<'a> {
/// Create a new `Delete` operation
#[inline]
#[must_use]
pub fn new(table: &'a str, key: &'a [u8]) -> Delete<'a> {
Delete { table, key }
pub fn new_delete(table: &'a str, key: &'a [u8]) -> Self {
Self::Delete { table, key }
}
}

/// Delete range operation, it will remove the database
/// entries in the range [from, to)
#[allow(dead_code)]
#[derive(Debug)]
pub struct DeleteRange<'a> {
/// The table name
pub(crate) table: &'a str,
/// The `from` key
pub(crate) from: &'a [u8],
/// The `to` key
pub(crate) to: &'a [u8],
}

impl<'a> DeleteRange<'a> {
/// Create a new `DeleteRange` operation
#[inline]
#[allow(dead_code)]
pub(crate) fn new(table: &'a str, from: &'a [u8], to: &'a [u8]) -> DeleteRange<'a> {
DeleteRange { table, from, to }
#[inline]
pub(crate) fn new_delete_range(table: &'a str, from: &'a [u8], to: &'a [u8]) -> Self {
Self::DeleteRange { table, from, to }
}
}

Expand Down
2 changes: 1 addition & 1 deletion engine/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use thiserror::Error;
#[derive(Error, Debug)]
pub enum EngineError {
/// Met I/O Error during persisting data
#[error("I/O Error")]
#[error("I/O Error: {0}")]
IoError(String),
/// Table Not Found
#[error("Table {0} Not Found")]
Expand Down
2 changes: 1 addition & 1 deletion engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,4 @@ pub mod memory_engine;
/// `RocksDB` Storage Engine
pub mod rocksdb_engine;

pub use self::engine_api::{Delete, DeleteRange, Put, StorageEngine, WriteOperation};
pub use self::engine_api::{StorageEngine, WriteOperation};
29 changes: 12 additions & 17 deletions engine/src/memory_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{cmp::Ordering, collections::HashMap, sync::Arc};
use parking_lot::RwLock;

use crate::{
engine_api::{Delete, DeleteRange, Put, StorageEngine, WriteOperation},
engine_api::{StorageEngine, WriteOperation},
error::EngineError,
};

Expand Down Expand Up @@ -67,19 +67,19 @@ impl StorageEngine for MemoryEngine {
let mut inner = self.inner.write();
for op in wr_ops {
match op {
WriteOperation::Put(Put { table, key, value }) => {
WriteOperation::Put { table, key, value } => {
let table = inner
.get_mut(table)
.ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?;
let _ignore = table.insert(key, value);
}
WriteOperation::Delete(Delete { table, key }) => {
WriteOperation::Delete { table, key } => {
let table = inner
.get_mut(table)
.ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?;
let _ignore = table.remove(key);
}
WriteOperation::DeleteRange(DeleteRange { table, from, to }) => {
WriteOperation::DeleteRange { table, from, to } => {
let table = inner
.get_mut(table)
.ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?;
Expand All @@ -106,26 +106,24 @@ mod test {
use std::iter::{repeat, zip};

use super::*;
use crate::engine_api::Put;

const TESTTABLES: [&'static str; 3] = ["kv", "lease", "auth"];

#[test]
fn write_batch_into_a_non_existing_table_should_fail() {
let engine = MemoryEngine::new(&TESTTABLES).unwrap();

let put = WriteOperation::Put(Put::new(
let put = WriteOperation::new_put(
"hello",
"hello".as_bytes().to_vec(),
"world".as_bytes().to_vec(),
));
);
assert!(engine.write_batch(vec![put], false).is_err());

let delete = WriteOperation::Delete(Delete::new("hello", b"hello"));
let delete = WriteOperation::new_delete("hello", b"hello");
assert!(engine.write_batch(vec![delete], false).is_err());

let delete_range =
WriteOperation::DeleteRange(DeleteRange::new("hello", b"hello", b"world"));
let delete_range = WriteOperation::new_delete_range("hello", b"hello", b"world");
assert!(engine.write_batch(vec![delete_range], false).is_err());
}

Expand All @@ -138,7 +136,7 @@ mod test {
let keys = origin_set.clone();
let values = origin_set.clone();
let puts = zip(keys, values)
.map(|(k, v)| WriteOperation::Put(Put::new("kv", k, v)))
.map(|(k, v)| WriteOperation::new_put("kv", k, v))
.collect::<Vec<WriteOperation<'_>>>();

assert!(engine.write_batch(puts, false).is_ok());
Expand All @@ -147,7 +145,7 @@ mod test {
assert_eq!(res_1.iter().filter(|v| v.is_some()).count(), 10);

let delete_key: Vec<u8> = vec![1, 1, 1, 1];
let delete = WriteOperation::Delete(Delete::new("kv", delete_key.as_slice()));
let delete = WriteOperation::new_delete("kv", delete_key.as_slice());

let res_2 = engine.write_batch(vec![delete], false);
assert!(res_2.is_ok());
Expand All @@ -157,11 +155,8 @@ mod test {

let delete_start: Vec<u8> = vec![2, 2, 2, 2];
let delete_end: Vec<u8> = vec![5, 5, 5, 5];
let delete_range = WriteOperation::DeleteRange(DeleteRange::new(
"kv",
delete_start.as_slice(),
&delete_end.as_slice(),
));
let delete_range =
WriteOperation::new_delete_range("kv", delete_start.as_slice(), &delete_end.as_slice());
let res_4 = engine.write_batch(vec![delete_range], false);
assert!(res_4.is_ok());

Expand Down
83 changes: 45 additions & 38 deletions engine/src/rocksdb_engine.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,36 @@
use std::{iter::repeat, path::PathBuf, sync::Arc};
use std::{iter::repeat, path::Path, sync::Arc};

use rocksdb::{Options, WriteBatchWithTransaction, WriteOptions, DB};
use rocksdb::{Error as RocksError, Options, WriteBatchWithTransaction, WriteOptions, DB};

use crate::{
engine_api::{Delete, DeleteRange, Put, StorageEngine, WriteOperation},
engine_api::{StorageEngine, WriteOperation},
error::EngineError,
};

/// Translate a `RocksError` into a `EngineError`
impl From<RocksError> for EngineError {
#[inline]
fn from(err: RocksError) -> Self {
let err = err.into_string();
if let Some((err_kind, err_msg)) = err.split_once(':') {
match err_kind {
"Corruption" => EngineError::Corruption(err_msg.to_owned()),
"Invalid argument" => {
if let Some(table_name) = err_msg.strip_prefix(" Column family not found: ") {
EngineError::TableNotFound(table_name.to_owned())
} else {
EngineError::InvalidArgument(err_msg.to_owned())
}
}
"IO error" => EngineError::IoError(err_msg.to_owned()),
_ => EngineError::UnderlyingError(err_msg.to_owned()),
}
} else {
EngineError::UnderlyingError(err)
}
}
}

/// `RocksDB` Storage Engine
#[derive(Debug, Clone)]
pub struct RocksEngine {
Expand All @@ -21,16 +45,12 @@ impl RocksEngine {
///
/// Return `EngineError` when DB open failed.
#[inline]
pub fn new(data_dir: &PathBuf, tables: &[&'static str]) -> Result<Self, EngineError> {
pub fn new(data_dir: impl AsRef<Path>, tables: &[&'static str]) -> Result<Self, EngineError> {
let mut db_opts = Options::default();
db_opts.create_missing_column_families(true);
db_opts.create_if_missing(true);
Ok(Self {
inner: Arc::new(
DB::open_cf(&db_opts, data_dir, tables).map_err(|e| {
EngineError::UnderlyingError(format!("cannot open database: {e}"))
})?,
),
inner: Arc::new(DB::open_cf(&db_opts, data_dir, tables)?),
})
}
}
Expand All @@ -39,10 +59,7 @@ impl StorageEngine for RocksEngine {
#[inline]
fn get(&self, table: &str, key: impl AsRef<[u8]>) -> Result<Option<Vec<u8>>, EngineError> {
if let Some(cf) = self.inner.cf_handle(table) {
Ok(self
.inner
.get_cf(&cf, key)
.map_err(|e| EngineError::IoError(format!("get key from {table} failed: {e}")))?)
Ok(self.inner.get_cf(&cf, key)?)
} else {
Err(EngineError::TableNotFound(table.to_owned()))
}
Expand All @@ -58,12 +75,8 @@ impl StorageEngine for RocksEngine {
self.inner
.multi_get_cf(repeat(&cf).zip(keys.iter()))
.into_iter()
.map(|res| {
res.map_err(|err| {
EngineError::IoError(format!("get key from {table} failed: {err}"))
})
})
.collect::<Result<Vec<_>, _>>()
.map(|res| res.map_err(EngineError::from))
.collect::<Result<Vec<_>, EngineError>>()
} else {
Err(EngineError::TableNotFound(table.to_owned()))
}
Expand All @@ -75,21 +88,21 @@ impl StorageEngine for RocksEngine {

for op in wr_ops {
match op {
WriteOperation::Put(Put { table, key, value }) => {
WriteOperation::Put { table, key, value } => {
let cf = self
.inner
.cf_handle(table)
.ok_or(EngineError::TableNotFound(table.to_owned()))?;
batch.put_cf(&cf, key, value);
}
WriteOperation::Delete(Delete { table, key }) => {
WriteOperation::Delete { table, key } => {
let cf = self
.inner
.cf_handle(table)
.ok_or(EngineError::TableNotFound(table.to_owned()))?;
batch.delete_cf(&cf, key);
}
WriteOperation::DeleteRange(DeleteRange { table, from, to }) => {
WriteOperation::DeleteRange { table, from, to } => {
let cf = self
.inner
.cf_handle(table)
Expand All @@ -100,9 +113,7 @@ impl StorageEngine for RocksEngine {
}
let mut opt = WriteOptions::default();
opt.set_sync(sync);
self.inner
.write_opt(batch, &opt)
.map_err(|e| EngineError::UnderlyingError(format!("{e}")))
self.inner.write_opt(batch, &opt).map_err(EngineError::from)
}
}

Expand All @@ -112,7 +123,7 @@ impl StorageEngine for RocksEngine {
///
/// Panic if db destroy failed.
#[cfg(test)]
pub fn destroy(data_dir: &PathBuf) {
pub fn destroy(data_dir: impl AsRef<Path>) {
#[allow(clippy::unwrap_used)]
DB::destroy(&Options::default(), data_dir).unwrap();
}
Expand All @@ -132,18 +143,17 @@ mod test {
let data_dir = PathBuf::from("/tmp/write_batch_into_a_non_existing_table_should_fail");
let engine = RocksEngine::new(&data_dir, &TESTTABLES).unwrap();

let put = WriteOperation::Put(Put::new(
let put = WriteOperation::new_put(
"hello",
"hello".as_bytes().to_vec(),
"world".as_bytes().to_vec(),
));
);
assert!(engine.write_batch(vec![put], false).is_err());

let delete = WriteOperation::Delete(Delete::new("hello", b"hello"));
let delete = WriteOperation::new_delete("hello", b"hello");
assert!(engine.write_batch(vec![delete], false).is_err());

let delete_range =
WriteOperation::DeleteRange(DeleteRange::new("hello", b"hello", b"world"));
let delete_range = WriteOperation::new_delete_range("hello", b"hello", b"world");
assert!(engine.write_batch(vec![delete_range], false).is_err());

drop(engine);
Expand All @@ -160,7 +170,7 @@ mod test {
let keys = origin_set.clone();
let values = origin_set.clone();
let puts = zip(keys, values)
.map(|(k, v)| WriteOperation::Put(Put::new("kv", k, v)))
.map(|(k, v)| WriteOperation::new_put("kv", k, v))
.collect::<Vec<WriteOperation<'_>>>();

assert!(engine.write_batch(puts, false).is_ok());
Expand All @@ -169,7 +179,7 @@ mod test {
assert_eq!(res_1.iter().filter(|v| v.is_some()).count(), 10);

let delete_key: Vec<u8> = vec![1, 1, 1, 1];
let delete = WriteOperation::Delete(Delete::new("kv", delete_key.as_slice()));
let delete = WriteOperation::new_delete("kv", delete_key.as_slice());

let res_2 = engine.write_batch(vec![delete], false);
assert!(res_2.is_ok());
Expand All @@ -179,11 +189,8 @@ mod test {

let delete_start: Vec<u8> = vec![2, 2, 2, 2];
let delete_end: Vec<u8> = vec![5, 5, 5, 5];
let delete_range = WriteOperation::DeleteRange(DeleteRange::new(
"kv",
delete_start.as_slice(),
&delete_end.as_slice(),
));
let delete_range =
WriteOperation::new_delete_range("kv", delete_start.as_slice(), &delete_end.as_slice());
let res_4 = engine.write_batch(vec![delete_range], false);
assert!(res_4.is_ok());

Expand Down
Loading

0 comments on commit 7dbf926

Please sign in to comment.