diff --git a/engine/src/engine_api.rs b/engine/src/engine_api.rs index 181484b7dd..01d055fccb 100644 --- a/engine/src/engine_api.rs +++ b/engine/src/engine_api.rs @@ -5,71 +5,52 @@ use crate::error::EngineError; #[derive(Debug)] pub enum WriteOperation<'a> { /// `Put` operation - Put(Put<'a>), + Put { + /// The table name + table: &'a str, + /// Key + key: Vec, + /// Value + value: Vec, + }, /// `Delete` operation - Delete(Delete<'a>), - /// `DeleteRange` operation - DeleteRange(DeleteRange<'a>), + Delete { + /// The table name + table: &'a str, + /// The target key + key: &'a [u8], + }, + /// Delete range operation, it will remove the database entries in the range [from, to) + DeleteRange { + /// The table name + table: &'a str, + /// The `from` key + from: &'a [u8], + /// The `to` key + to: &'a [u8], + }, } -/// Put operation -#[derive(Debug)] -pub struct Put<'a> { - /// The table name - pub(crate) table: &'a str, - /// Key - pub(crate) key: Vec, - /// Value - pub(crate) value: Vec, -} - -impl<'a> Put<'a> { +impl<'a> WriteOperation<'a> { /// Create a new `Put` operation #[inline] #[must_use] - pub fn new(table: &'a str, key: Vec, value: Vec) -> Put<'a> { - Put { table, key, value } + pub fn new_put(table: &'a str, key: Vec, value: Vec) -> Self { + Self::Put { table, key, value } } -} - -/// Delete operation, -#[allow(dead_code)] -#[derive(Debug)] -pub struct Delete<'a> { - /// The table name - pub(crate) table: &'a str, - /// The target key - pub(crate) key: &'a [u8], -} -impl<'a> Delete<'a> { /// Create a new `Delete` operation #[inline] #[must_use] - pub fn new(table: &'a str, key: &'a [u8]) -> Delete<'a> { - Delete { table, key } + pub fn new_delete(table: &'a str, key: &'a [u8]) -> Self { + Self::Delete { table, key } } -} - -/// Delete range operation, it will remove the database -/// entries in the range [from, to) -#[allow(dead_code)] -#[derive(Debug)] -pub struct DeleteRange<'a> { - /// The table name - pub(crate) table: &'a str, - /// The `from` key - pub(crate) from: &'a [u8], - /// The `to` key - pub(crate) to: &'a [u8], -} -impl<'a> DeleteRange<'a> { /// Create a new `DeleteRange` operation - #[inline] #[allow(dead_code)] - pub(crate) fn new(table: &'a str, from: &'a [u8], to: &'a [u8]) -> DeleteRange<'a> { - DeleteRange { table, from, to } + #[inline] + pub(crate) fn new_delete_range(table: &'a str, from: &'a [u8], to: &'a [u8]) -> Self { + Self::DeleteRange { table, from, to } } } diff --git a/engine/src/error.rs b/engine/src/error.rs index dfd0f1b721..1a645c8748 100644 --- a/engine/src/error.rs +++ b/engine/src/error.rs @@ -6,7 +6,7 @@ use thiserror::Error; #[derive(Error, Debug)] pub enum EngineError { /// Met I/O Error during persisting data - #[error("I/O Error")] + #[error("I/O Error: {0}")] IoError(String), /// Table Not Found #[error("Table {0} Not Found")] diff --git a/engine/src/lib.rs b/engine/src/lib.rs index e8f85f1442..30af51e227 100644 --- a/engine/src/lib.rs +++ b/engine/src/lib.rs @@ -155,4 +155,4 @@ pub mod memory_engine; /// `RocksDB` Storage Engine pub mod rocksdb_engine; -pub use self::engine_api::{Delete, DeleteRange, Put, StorageEngine, WriteOperation}; +pub use self::engine_api::{StorageEngine, WriteOperation}; diff --git a/engine/src/memory_engine.rs b/engine/src/memory_engine.rs index 3e9e893e19..11734519fe 100644 --- a/engine/src/memory_engine.rs +++ b/engine/src/memory_engine.rs @@ -3,7 +3,7 @@ use std::{cmp::Ordering, collections::HashMap, sync::Arc}; use parking_lot::RwLock; use crate::{ - engine_api::{Delete, DeleteRange, Put, StorageEngine, WriteOperation}, + engine_api::{StorageEngine, WriteOperation}, error::EngineError, }; @@ -67,19 +67,19 @@ impl StorageEngine for MemoryEngine { let mut inner = self.inner.write(); for op in wr_ops { match op { - WriteOperation::Put(Put { table, key, value }) => { + WriteOperation::Put { table, key, value } => { let table = inner .get_mut(table) .ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?; let _ignore = table.insert(key, value); } - WriteOperation::Delete(Delete { table, key }) => { + WriteOperation::Delete { table, key } => { let table = inner .get_mut(table) .ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?; let _ignore = table.remove(key); } - WriteOperation::DeleteRange(DeleteRange { table, from, to }) => { + WriteOperation::DeleteRange { table, from, to } => { let table = inner .get_mut(table) .ok_or_else(|| EngineError::TableNotFound(table.to_owned()))?; @@ -106,7 +106,6 @@ mod test { use std::iter::{repeat, zip}; use super::*; - use crate::engine_api::Put; const TESTTABLES: [&'static str; 3] = ["kv", "lease", "auth"]; @@ -114,18 +113,17 @@ mod test { fn write_batch_into_a_non_existing_table_should_fail() { let engine = MemoryEngine::new(&TESTTABLES).unwrap(); - let put = WriteOperation::Put(Put::new( + let put = WriteOperation::new_put( "hello", "hello".as_bytes().to_vec(), "world".as_bytes().to_vec(), - )); + ); assert!(engine.write_batch(vec![put], false).is_err()); - let delete = WriteOperation::Delete(Delete::new("hello", b"hello")); + let delete = WriteOperation::new_delete("hello", b"hello"); assert!(engine.write_batch(vec![delete], false).is_err()); - let delete_range = - WriteOperation::DeleteRange(DeleteRange::new("hello", b"hello", b"world")); + let delete_range = WriteOperation::new_delete_range("hello", b"hello", b"world"); assert!(engine.write_batch(vec![delete_range], false).is_err()); } @@ -138,7 +136,7 @@ mod test { let keys = origin_set.clone(); let values = origin_set.clone(); let puts = zip(keys, values) - .map(|(k, v)| WriteOperation::Put(Put::new("kv", k, v))) + .map(|(k, v)| WriteOperation::new_put("kv", k, v)) .collect::>>(); assert!(engine.write_batch(puts, false).is_ok()); @@ -147,7 +145,7 @@ mod test { assert_eq!(res_1.iter().filter(|v| v.is_some()).count(), 10); let delete_key: Vec = vec![1, 1, 1, 1]; - let delete = WriteOperation::Delete(Delete::new("kv", delete_key.as_slice())); + let delete = WriteOperation::new_delete("kv", delete_key.as_slice()); let res_2 = engine.write_batch(vec![delete], false); assert!(res_2.is_ok()); @@ -157,11 +155,8 @@ mod test { let delete_start: Vec = vec![2, 2, 2, 2]; let delete_end: Vec = vec![5, 5, 5, 5]; - let delete_range = WriteOperation::DeleteRange(DeleteRange::new( - "kv", - delete_start.as_slice(), - &delete_end.as_slice(), - )); + let delete_range = + WriteOperation::new_delete_range("kv", delete_start.as_slice(), &delete_end.as_slice()); let res_4 = engine.write_batch(vec![delete_range], false); assert!(res_4.is_ok()); diff --git a/engine/src/rocksdb_engine.rs b/engine/src/rocksdb_engine.rs index 488bd4b851..cecad8c26b 100644 --- a/engine/src/rocksdb_engine.rs +++ b/engine/src/rocksdb_engine.rs @@ -1,12 +1,36 @@ -use std::{iter::repeat, path::PathBuf, sync::Arc}; +use std::{iter::repeat, path::Path, sync::Arc}; -use rocksdb::{Options, WriteBatchWithTransaction, WriteOptions, DB}; +use rocksdb::{Error as RocksError, Options, WriteBatchWithTransaction, WriteOptions, DB}; use crate::{ - engine_api::{Delete, DeleteRange, Put, StorageEngine, WriteOperation}, + engine_api::{StorageEngine, WriteOperation}, error::EngineError, }; +/// Translate a `RocksError` into a `EngineError` +impl From for EngineError { + #[inline] + fn from(err: RocksError) -> Self { + let err = err.into_string(); + if let Some((err_kind, err_msg)) = err.split_once(':') { + match err_kind { + "Corruption" => EngineError::Corruption(err_msg.to_owned()), + "Invalid argument" => { + if let Some(table_name) = err_msg.strip_prefix(" Column family not found: ") { + EngineError::TableNotFound(table_name.to_owned()) + } else { + EngineError::InvalidArgument(err_msg.to_owned()) + } + } + "IO error" => EngineError::IoError(err_msg.to_owned()), + _ => EngineError::UnderlyingError(err_msg.to_owned()), + } + } else { + EngineError::UnderlyingError(err) + } + } +} + /// `RocksDB` Storage Engine #[derive(Debug, Clone)] pub struct RocksEngine { @@ -21,16 +45,12 @@ impl RocksEngine { /// /// Return `EngineError` when DB open failed. #[inline] - pub fn new(data_dir: &PathBuf, tables: &[&'static str]) -> Result { + pub fn new(data_dir: impl AsRef, tables: &[&'static str]) -> Result { let mut db_opts = Options::default(); db_opts.create_missing_column_families(true); db_opts.create_if_missing(true); Ok(Self { - inner: Arc::new( - DB::open_cf(&db_opts, data_dir, tables).map_err(|e| { - EngineError::UnderlyingError(format!("cannot open database: {e}")) - })?, - ), + inner: Arc::new(DB::open_cf(&db_opts, data_dir, tables)?), }) } } @@ -39,10 +59,7 @@ impl StorageEngine for RocksEngine { #[inline] fn get(&self, table: &str, key: impl AsRef<[u8]>) -> Result>, EngineError> { if let Some(cf) = self.inner.cf_handle(table) { - Ok(self - .inner - .get_cf(&cf, key) - .map_err(|e| EngineError::IoError(format!("get key from {table} failed: {e}")))?) + Ok(self.inner.get_cf(&cf, key)?) } else { Err(EngineError::TableNotFound(table.to_owned())) } @@ -58,12 +75,8 @@ impl StorageEngine for RocksEngine { self.inner .multi_get_cf(repeat(&cf).zip(keys.iter())) .into_iter() - .map(|res| { - res.map_err(|err| { - EngineError::IoError(format!("get key from {table} failed: {err}")) - }) - }) - .collect::, _>>() + .map(|res| res.map_err(EngineError::from)) + .collect::, EngineError>>() } else { Err(EngineError::TableNotFound(table.to_owned())) } @@ -75,21 +88,21 @@ impl StorageEngine for RocksEngine { for op in wr_ops { match op { - WriteOperation::Put(Put { table, key, value }) => { + WriteOperation::Put { table, key, value } => { let cf = self .inner .cf_handle(table) .ok_or(EngineError::TableNotFound(table.to_owned()))?; batch.put_cf(&cf, key, value); } - WriteOperation::Delete(Delete { table, key }) => { + WriteOperation::Delete { table, key } => { let cf = self .inner .cf_handle(table) .ok_or(EngineError::TableNotFound(table.to_owned()))?; batch.delete_cf(&cf, key); } - WriteOperation::DeleteRange(DeleteRange { table, from, to }) => { + WriteOperation::DeleteRange { table, from, to } => { let cf = self .inner .cf_handle(table) @@ -100,9 +113,7 @@ impl StorageEngine for RocksEngine { } let mut opt = WriteOptions::default(); opt.set_sync(sync); - self.inner - .write_opt(batch, &opt) - .map_err(|e| EngineError::UnderlyingError(format!("{e}"))) + self.inner.write_opt(batch, &opt).map_err(EngineError::from) } } @@ -112,7 +123,7 @@ impl StorageEngine for RocksEngine { /// /// Panic if db destroy failed. #[cfg(test)] -pub fn destroy(data_dir: &PathBuf) { +pub fn destroy(data_dir: impl AsRef) { #[allow(clippy::unwrap_used)] DB::destroy(&Options::default(), data_dir).unwrap(); } @@ -132,18 +143,17 @@ mod test { let data_dir = PathBuf::from("/tmp/write_batch_into_a_non_existing_table_should_fail"); let engine = RocksEngine::new(&data_dir, &TESTTABLES).unwrap(); - let put = WriteOperation::Put(Put::new( + let put = WriteOperation::new_put( "hello", "hello".as_bytes().to_vec(), "world".as_bytes().to_vec(), - )); + ); assert!(engine.write_batch(vec![put], false).is_err()); - let delete = WriteOperation::Delete(Delete::new("hello", b"hello")); + let delete = WriteOperation::new_delete("hello", b"hello"); assert!(engine.write_batch(vec![delete], false).is_err()); - let delete_range = - WriteOperation::DeleteRange(DeleteRange::new("hello", b"hello", b"world")); + let delete_range = WriteOperation::new_delete_range("hello", b"hello", b"world"); assert!(engine.write_batch(vec![delete_range], false).is_err()); drop(engine); @@ -160,7 +170,7 @@ mod test { let keys = origin_set.clone(); let values = origin_set.clone(); let puts = zip(keys, values) - .map(|(k, v)| WriteOperation::Put(Put::new("kv", k, v))) + .map(|(k, v)| WriteOperation::new_put("kv", k, v)) .collect::>>(); assert!(engine.write_batch(puts, false).is_ok()); @@ -169,7 +179,7 @@ mod test { assert_eq!(res_1.iter().filter(|v| v.is_some()).count(), 10); let delete_key: Vec = vec![1, 1, 1, 1]; - let delete = WriteOperation::Delete(Delete::new("kv", delete_key.as_slice())); + let delete = WriteOperation::new_delete("kv", delete_key.as_slice()); let res_2 = engine.write_batch(vec![delete], false); assert!(res_2.is_ok()); @@ -179,11 +189,8 @@ mod test { let delete_start: Vec = vec![2, 2, 2, 2]; let delete_end: Vec = vec![5, 5, 5, 5]; - let delete_range = WriteOperation::DeleteRange(DeleteRange::new( - "kv", - delete_start.as_slice(), - &delete_end.as_slice(), - )); + let delete_range = + WriteOperation::new_delete_range("kv", delete_start.as_slice(), &delete_end.as_slice()); let res_4 = engine.write_batch(vec![delete_range], false); assert!(res_4.is_ok()); diff --git a/scripts/quick_start.sh b/scripts/quick_start.sh index ff5bcad62f..aeb143b81a 100755 --- a/scripts/quick_start.sh +++ b/scripts/quick_start.sh @@ -13,7 +13,6 @@ run_xline() { cmd="/usr/local/bin/xline \ --name node${1} \ --members ${MEMBERS} \ - --storage-engine rocksdb \ --auth-public-key /mnt/public.pem \ --auth-private-key /mnt/private.pem" diff --git a/utils/src/config.rs b/utils/src/config.rs index 71b5ad2582..c1901d2363 100644 --- a/utils/src/config.rs +++ b/utils/src/config.rs @@ -13,6 +13,7 @@ pub struct XlineServerConfig { cluster: ClusterConfig, /// xline storage configuration object #[getset(get = "pub")] + #[serde(default = "StorageConfig::default")] storage: StorageConfig, /// log configuration object #[getset(get = "pub")] @@ -285,7 +286,7 @@ impl Default for ClientTimeout { #[derive(Debug, Clone, Deserialize, PartialEq, Eq)] #[serde( tag = "engine", - content = "datadir", + content = "data_dir", rename_all(deserialize = "lowercase") )] pub enum StorageConfig { @@ -298,7 +299,7 @@ pub enum StorageConfig { impl Default for StorageConfig { #[inline] fn default() -> Self { - StorageConfig::Memory + StorageConfig::RocksDB(PathBuf::from("/usr/local/xline/data-dir")) } } @@ -578,7 +579,7 @@ mod tests { ) ); - assert_eq!(config.storage, StorageConfig::default()); + assert_eq!(config.storage, StorageConfig::Memory); assert_eq!( config.log, @@ -612,10 +613,6 @@ mod tests { node2 = '127.0.0.1:2380' node3 = '127.0.0.1:2381' - [storage] - engine = 'rocksdb' - datadir = '/tmp/xline/data-dir' - [log] path = '/var/log/xline' @@ -647,7 +644,7 @@ mod tests { ); if let StorageConfig::RocksDB(path) = config.storage { - assert_eq!(path, PathBuf::from("/tmp/xline/data-dir")); + assert_eq!(path, PathBuf::from("/usr/local/xline/data-dir")); } else { unreachable!(); } diff --git a/xline/src/main.rs b/xline/src/main.rs index 69b6876e88..aef4d50478 100644 --- a/xline/src/main.rs +++ b/xline/src/main.rs @@ -202,7 +202,7 @@ struct ServerArgs { #[clap(long, value_parser = parse_duration, default_value = "50ms")] client_retry_timeout: Duration, /// Storage engine - #[clap(long)] + #[clap(long, default_value = "rocksdb")] storage_engine: String, /// DB directory #[clap(long, default_value = "/usr/local/xline/data-dir")] @@ -382,7 +382,7 @@ async fn main() -> Result<()> { debug!("server_addr = {:?}", self_addr); debug!("cluster_peers = {:?}", cluster_config.members()); - let db_proxy = DBProxy::new(storage_config)?; + let db_proxy = DBProxy::open(storage_config)?; let server = XlineServer::new( cluster_config.name().clone(), cluster_config.members().clone(), diff --git a/xline/src/storage/auth_store/store.rs b/xline/src/storage/auth_store/store.rs index 155798db29..bea1bdf3fa 100644 --- a/xline/src/storage/auth_store/store.rs +++ b/xline/src/storage/auth_store/store.rs @@ -421,7 +421,7 @@ mod test { let (lease_cmd_tx, _) = mpsc::channel(1); #[allow(clippy::unwrap_used)] - let auth_db = DBProxy::new(&StorageConfig::default()).unwrap(); + let auth_db = DBProxy::open(&StorageConfig::Memory).unwrap(); // It's ok to do so in that a memory engine should return an error let store = AuthStore::new(lease_cmd_tx, key_pair, header_gen, auth_db); diff --git a/xline/src/storage/db.rs b/xline/src/storage/db.rs index 18bcb4e85a..6c57f820e9 100644 --- a/xline/src/storage/db.rs +++ b/xline/src/storage/db.rs @@ -1,8 +1,8 @@ use std::sync::Arc; use engine::{ - engine_api::StorageEngine, memory_engine::MemoryEngine, rocksdb_engine::RocksEngine, Delete, - Put, WriteOperation, + engine_api::StorageEngine, memory_engine::MemoryEngine, rocksdb_engine::RocksEngine, + WriteOperation, }; use utils::config::StorageConfig; @@ -41,7 +41,7 @@ where K: Into> + std::fmt::Debug + Sized, V: Into> + std::fmt::Debug + Sized, { - let put_op = WriteOperation::Put(Put::new(table, key.into(), value.into())); + let put_op = WriteOperation::new_put(table, key.into(), value.into()); self.engine.write_batch(vec![put_op], sync).map_err(|e| { ExecuteError::DbError(format!("Failed to insert key-value, error: {e}")) })?; @@ -70,7 +70,7 @@ where where K: AsRef<[u8]> + std::fmt::Debug + Sized, { - let del_op = WriteOperation::Delete(Delete::new(table, key.as_ref())); + let del_op = WriteOperation::new_delete(table, key.as_ref()); self.engine .write_batch(vec![del_op], sync) .map_err(|e| ExecuteError::DbError(format!("Failed to delete Lease, error: {e}")))?; @@ -138,7 +138,7 @@ impl DBProxy { /// /// Return `ExecuteError::DbError` when open db failed #[inline] - pub fn new(config: &StorageConfig) -> Result, ExecuteError> { + pub fn open(config: &StorageConfig) -> Result, ExecuteError> { match *config { StorageConfig::Memory => { let engine = MemoryEngine::new(&XLINETABLES) diff --git a/xline/src/storage/kv_store.rs b/xline/src/storage/kv_store.rs index 70bbdab196..6a1c7528ad 100644 --- a/xline/src/storage/kv_store.rs +++ b/xline/src/storage/kv_store.rs @@ -918,7 +918,7 @@ mod test { assert!(tx.send(0).is_ok()); } }); - let kv_db = DBProxy::new(&StorageConfig::default())?; + let kv_db = DBProxy::open(&StorageConfig::Memory)?; let store = KvStore::new(lease_cmd_tx, del_rx, header_gen, kv_db); let keys = vec!["a", "b", "c", "d", "e"]; let vals = vec!["a", "b", "c", "d", "e"]; diff --git a/xline/src/storage/lease_store/mod.rs b/xline/src/storage/lease_store/mod.rs index 5f1480fae3..6bfa5e2d2b 100644 --- a/xline/src/storage/lease_store/mod.rs +++ b/xline/src/storage/lease_store/mod.rs @@ -525,7 +525,7 @@ mod test { let state = Arc::new(State::default()); let header_gen = Arc::new(HeaderGenerator::new(0, 0)); - let lease_db = DBProxy::new(&StorageConfig::default())?; + let lease_db = DBProxy::open(&StorageConfig::Memory)?; #[allow(clippy::unwrap_used)] // safe unwrap let lease_store = LeaseStore::new(del_tx, lease_cmd_rx, state, header_gen, lease_db); let _handle = tokio::spawn(async move { diff --git a/xline/tests/common/mod.rs b/xline/tests/common/mod.rs index 175c8223c6..be054189c4 100644 --- a/xline/tests/common/mod.rs +++ b/xline/tests/common/mod.rs @@ -55,7 +55,7 @@ impl Cluster { let listener = self.listeners.remove(&i).unwrap(); let all_members = self.all_members.clone(); #[allow(clippy::unwrap_used)] - let db = DBProxy::new(&StorageConfig::default()).unwrap(); + let db = DBProxy::open(&StorageConfig::Memory).unwrap(); tokio::spawn(async move { let server = XlineServer::new( name, diff --git a/xline_server.conf b/xline_server.conf index 347559e05c..757afd95a1 100644 --- a/xline_server.conf +++ b/xline_server.conf @@ -41,6 +41,11 @@ node3 = '127.0.0.1:2381' # The curp client propose request timeout # propose_timeout = '1s' +# Storage Engine Settings, default value is `RocksDB('/usr/local/xline/data-dir')` +# [storage] +# engine = 'rocksdb' +# data_dir = '/usr/local/xline/data-dir' + [log] path = '/var/log/xline' rotation = 'daily'