Skip to content

Commit

Permalink
Replace OptimisticTransactionDB with DB and WriteBatchWithIndex
Browse files Browse the repository at this point in the history
This commit replaces the OptimisticTransactionDB with the normal DB
and using WriteBatchWithIndex instead of transactions.

This fixes restatedev#1428.
  • Loading branch information
tillrohrmann committed Aug 26, 2024
1 parent b4a65dd commit c7f1402
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 319 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ reqwest = { version = "0.12.5", default-features = false, features = [
"stream",
] }
rlimit = { version = "0.10.1" }
rocksdb = { version = "0.29.0", package = "rust-rocksdb", features = ["multi-threaded-cf"], git = "https://github.com/restatedev/rust-rocksdb", rev = "8b621f1b4fb15ed0ba4a4b9da239432159e40b01" }
rocksdb = { version = "0.29.0", package = "rust-rocksdb", features = ["multi-threaded-cf"], git = "https://github.com/restatedev/rust-rocksdb", rev = "3e9a11a7f6c07f69fe8744727af099b12cfd0015" }
rustls = { version = "0.23.11", default-features = false, features = ["ring"] }
schemars = { version = "0.8", features = ["bytes", "enumset"] }
serde = { version = "1.0", features = ["derive"] }
Expand Down
3 changes: 2 additions & 1 deletion crates/bifrost/src/providers/local_loglet/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ impl RocksDbLogStore {
// it's also a small cf so it should be quick.
.add_to_flush_on_shutdown(CfExactPattern::new(METADATA_CF))
.ensure_column_families(cfs)
.build_as_db();
.build()
.expect("valid spec");
let db_name = db_spec.name().clone();
// todo: use the returned rocksdb object when open_db returns Arc<RocksDb>
let _ = db_manager.open_db(updateable_options, db_spec).await?;
Expand Down
3 changes: 2 additions & 1 deletion crates/metadata-store/src/local/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ impl LocalMetadataStore {
cf_options(options.rocksdb_memory_budget()),
)
.ensure_column_families(cfs)
.build_as_db();
.build()
.expect("valid spec");

let db = db_manager
.open_db(updateable_rocksdb_options.clone(), db_spec)
Expand Down
59 changes: 36 additions & 23 deletions crates/partition-store/src/partition_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use restate_types::config::Configuration;
use rocksdb::DBCompressionType;
use rocksdb::DBPinnableSlice;
use rocksdb::DBRawIteratorWithThreadMode;
use rocksdb::MultiThreaded;
use rocksdb::PrefixRange;
use rocksdb::ReadOptions;
use rocksdb::{BoundColumnFamily, SliceTransform};
Expand All @@ -41,8 +40,7 @@ use crate::keys::TableKey;
use crate::scan::PhysicalScan;
use crate::scan::TableScan;

pub type DB = rocksdb::OptimisticTransactionDB<MultiThreaded>;
type TransactionDB<'a> = rocksdb::Transaction<'a, DB>;
pub type DB = rocksdb::DB;

pub type DBIterator<'b> = DBRawIteratorWithThreadMode<'b, DB>;
pub type DBIteratorTransaction<'b> = DBRawIteratorWithThreadMode<'b, rocksdb::Transaction<'b, DB>>;
Expand Down Expand Up @@ -401,7 +399,8 @@ impl PartitionStore {
});

PartitionStoreTransaction {
txn: self.raw_db.transaction(),
write_batch_with_index: rocksdb::WriteBatchWithIndex::new(0, true),
raw_db: self.raw_db.as_ref(),
data_cf_handle,
rocksdb,
key_buffer: &mut self.key_buffer,
Expand Down Expand Up @@ -499,7 +498,8 @@ pub enum ScanMode {
pub struct PartitionStoreTransaction<'a> {
partition_id: PartitionId,
partition_key_range: &'a RangeInclusive<PartitionKey>,
txn: rocksdb::Transaction<'a, DB>,
write_batch_with_index: rocksdb::WriteBatchWithIndex,
raw_db: &'a DB,
rocksdb: Arc<RocksDb>,
data_cf_handle: Arc<BoundColumnFamily<'a>>,
key_buffer: &'a mut BytesMut,
Expand All @@ -512,14 +512,15 @@ impl<'a> PartitionStoreTransaction<'a> {
table: TableKind,
_key_kind: KeyKind,
prefix: Bytes,
) -> DBIteratorTransaction {
) -> DBIterator {
let table = self.table_handle(table);
let mut opts = ReadOptions::default();
let mut opts = rocksdb::ReadOptions::default();
opts.set_iterate_range(PrefixRange(prefix.clone()));
opts.set_prefix_same_as_start(true);
opts.set_total_order_seek(false);

let mut it = self.txn.raw_iterator_cf_opt(table, opts);
let it = self.raw_db.raw_iterator_cf_opt(table, opts);
let mut it = self.write_batch_with_index.iterator_with_base_cf(it, table);
it.seek(prefix);
it
}
Expand All @@ -531,14 +532,16 @@ impl<'a> PartitionStoreTransaction<'a> {
scan_mode: ScanMode,
from: Bytes,
to: Bytes,
) -> DBIteratorTransaction {
) -> DBIterator {
let table = self.table_handle(table);
let mut opts = ReadOptions::default();
let mut opts = rocksdb::ReadOptions::default();
// todo: use auto_prefix_mode, at the moment, rocksdb doesn't expose this through the C
// binding.
opts.set_total_order_seek(scan_mode == ScanMode::TotalOrder);
opts.set_iterate_range(from.clone()..to);
let mut it = self.txn.raw_iterator_cf_opt(table, opts);

let it = self.raw_db.raw_iterator_cf_opt(table, opts);
let mut it = self.write_batch_with_index.iterator_with_base_cf(it, table);
it.seek(from);
it
}
Expand Down Expand Up @@ -580,8 +583,7 @@ impl<'a> Transaction for PartitionStoreTransaction<'a> {
// We cannot directly commit the txn because it might fail because of unrelated concurrent
// writes to RocksDB. However, it is safe to write the WriteBatch for a given partition,
// because there can only be a single writer (the leading PartitionProcessor).
let write_batch = self.txn.get_writebatch();
if write_batch.is_empty() {
if self.write_batch_with_index.is_empty() {
return Ok(());
}
let io_mode = if Configuration::pinned()
Expand All @@ -597,14 +599,20 @@ impl<'a> Transaction for PartitionStoreTransaction<'a> {
// We disable WAL since bifrost is our durable distributed log.
opts.disable_wal(true);
self.rocksdb
.write_tx_batch(Priority::High, io_mode, opts, write_batch)
.write_batch_with_index(
"partition-store-txn-commit",
Priority::High,
io_mode,
opts,
self.write_batch_with_index,
)
.await
.map_err(|error| StorageError::Generic(error.into()))
}
}

impl<'a> StorageAccess for PartitionStoreTransaction<'a> {
type DBAccess<'b> = TransactionDB<'b> where Self: 'b;
type DBAccess<'b> = DB where Self: 'b;

fn iterator_from<K: TableKey>(
&self,
Expand Down Expand Up @@ -660,21 +668,26 @@ impl<'a> StorageAccess for PartitionStoreTransaction<'a> {
#[inline]
fn get<K: AsRef<[u8]>>(&self, table: TableKind, key: K) -> Result<Option<DBPinnableSlice>> {
let table = self.table_handle(table);
self.txn
.get_pinned_cf(table, key)
self.write_batch_with_index
.get_pinned_from_batch_and_db_cf(
self.raw_db,
table,
key,
&rocksdb::ReadOptions::default(),
)
.map_err(|error| StorageError::Generic(error.into()))
}

#[inline]
fn put_cf(&mut self, table: TableKind, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) {
let table = self.table_handle(table);
self.txn.put_cf(table, key, value).unwrap();
fn put_cf(&mut self, _table: TableKind, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) {
self.write_batch_with_index
.put_cf(&self.data_cf_handle, key, value);
}

#[inline]
fn delete_cf(&mut self, table: TableKind, key: impl AsRef<[u8]>) {
let table = self.table_handle(table);
self.txn.delete_cf(table, key).unwrap();
fn delete_cf(&mut self, _table: TableKind, key: impl AsRef<[u8]>) {
self.write_batch_with_index
.delete_cf(&self.data_cf_handle, key);
}
}

Expand Down
3 changes: 2 additions & 1 deletion crates/partition-store/src/partition_store_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ impl PartitionStoreManager {
cf_options(per_partition_memory_budget),
)
.ensure_column_families(partition_ids_to_cfs(initial_partition_set))
.build_as_optimistic_db();
.build()
.expect("valid spec");

let manager = RocksDbManager::get();
let raw_db = manager.open_db(updateable_opts, db_spec).await?;
Expand Down
8 changes: 4 additions & 4 deletions crates/rocksdb/src/db_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,11 @@ impl RocksDbManager {
self.dbs.read().get(&name).cloned()
}

pub async fn open_db<T: RocksAccess + Send + Sync + 'static>(
pub async fn open_db(
&'static self,
mut updateable_opts: BoxedLiveLoad<RocksDbOptions>,
mut db_spec: DbSpec<T>,
) -> Result<Arc<T>, RocksError> {
mut db_spec: DbSpec,
) -> Result<Arc<rocksdb::DB>, RocksError> {
if self
.shutting_down
.load(std::sync::atomic::Ordering::Acquire)
Expand All @@ -154,7 +154,7 @@ impl RocksDbManager {
self.amend_db_options(&mut db_spec.db_options, &options);

// todo: move to bg thread pool
let db = Arc::new(RocksAccess::open_db(
let db = Arc::new(rocksdb::DB::open_db(
&db_spec,
self.default_cf_options(&options),
)?);
Expand Down
22 changes: 4 additions & 18 deletions crates/rocksdb/src/db_spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl CfNameMatch for CfExactPattern {

#[derive(Builder)]
#[builder(pattern = "owned", build_fn(name = "build"))]
pub struct DbSpec<T> {
pub struct DbSpec {
pub(crate) name: DbName,
pub(crate) path: PathBuf,
/// All column families that should be flushed on shutdown, no flush will be performed if empty
Expand All @@ -141,18 +141,16 @@ pub struct DbSpec<T> {
/// a column family didn't match any, opening the database or the column family will fail with
/// `UnknownColumnFamily` error
pub(crate) cf_patterns: Vec<(BoxedCfMatcher, BoxedCfOptionUpdater)>,
#[builder(setter(skip))]
_phantom: std::marker::PhantomData<T>,
}

impl<T> DbSpec<T> {
impl DbSpec {
pub fn name(&self) -> &DbName {
&self.name
}
}

impl<T> DbSpecBuilder<T> {
pub fn new(name: DbName, path: PathBuf, db_options: rocksdb::Options) -> DbSpecBuilder<T> {
impl DbSpecBuilder {
pub fn new(name: DbName, path: PathBuf, db_options: rocksdb::Options) -> DbSpecBuilder {
Self {
name: Some(name),
path: Some(path),
Expand Down Expand Up @@ -182,15 +180,3 @@ impl<T> DbSpecBuilder<T> {
self
}
}

impl DbSpecBuilder<rocksdb::DB> {
pub fn build_as_db(self) -> DbSpec<rocksdb::DB> {
self.build().unwrap()
}
}

impl DbSpecBuilder<rocksdb::OptimisticTransactionDB> {
pub fn build_as_optimistic_db(self) -> DbSpec<rocksdb::OptimisticTransactionDB> {
self.build().unwrap()
}
}
Loading

0 comments on commit c7f1402

Please sign in to comment.