Skip to content

Commit

Permalink
Make Table Send
Browse files Browse the repository at this point in the history
This allows inserts into separate tables to be executed concurrently
across multiple threads
  • Loading branch information
cberner committed Feb 11, 2023
1 parent e50cc24 commit a9c2b1c
Show file tree
Hide file tree
Showing 10 changed files with 263 additions and 104 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ debug = true
name = "atomics_benchmark"
harness = false

[[bench]]
name = "multithreaded_insert_benchmark"
harness = false

[[bench]]
name = "userspace_cache_benchmark"
harness = false
Expand Down
103 changes: 103 additions & 0 deletions benches/multithreaded_insert_benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
use std::env::current_dir;
use std::{fs, process, thread};
use tempfile::NamedTempFile;

use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use redb::{Database, ReadableTable, TableDefinition};
use std::time::Instant;

const ELEMENTS: usize = 1_000_000;
const RNG_SEED: u64 = 3;

const TABLE1: TableDefinition<u128, u128> = TableDefinition::new("x");
const TABLE2: TableDefinition<u128, u128> = TableDefinition::new("y");

// TODO: multi-threaded inserts are slower. Probably due to lock contention checking dirty pages
fn main() {
let mut rng = StdRng::seed_from_u64(RNG_SEED);
let mut values = vec![];
for _ in 0..ELEMENTS {
values.push(rng.gen());
}

let tmpdir = current_dir().unwrap().join(".benchmark");
fs::create_dir(&tmpdir).unwrap();

let tmpdir2 = tmpdir.clone();
ctrlc::set_handler(move || {
fs::remove_dir_all(&tmpdir2).unwrap();
process::exit(1);
})
.unwrap();

{
let tmpfile: NamedTempFile = NamedTempFile::new_in(current_dir().unwrap()).unwrap();
let db = unsafe { Database::builder().create_mmapped(tmpfile.path()).unwrap() };

let start = Instant::now();
let write_txn = db.begin_write().unwrap();
{
let mut table1 = write_txn.open_table(TABLE1).unwrap();
let mut table2 = write_txn.open_table(TABLE2).unwrap();

for value in values.iter() {
table1.insert(value, value).unwrap();
table2.insert(value, value).unwrap();
}
}
write_txn.commit().unwrap();
let end = Instant::now();
let duration = end - start;
println!(
"single threaded load: {} pairs in {}ms",
2 * ELEMENTS,
duration.as_millis()
);
let read_txn = db.begin_read().unwrap();
let table = read_txn.open_table(TABLE1).unwrap();
assert_eq!(table.len().unwrap(), ELEMENTS);
let table = read_txn.open_table(TABLE2).unwrap();
assert_eq!(table.len().unwrap(), ELEMENTS);
}

{
let tmpfile: NamedTempFile = NamedTempFile::new_in(current_dir().unwrap()).unwrap();
let db = unsafe { Database::builder().create_mmapped(tmpfile.path()).unwrap() };

let start = Instant::now();
let write_txn = db.begin_write().unwrap();
{
let mut table1 = write_txn.open_table(TABLE1).unwrap();
let mut table2 = write_txn.open_table(TABLE2).unwrap();

thread::scope(|s| {
s.spawn(|| {
for value in values.iter() {
table1.insert(value, value).unwrap();
}
});
s.spawn(|| {
for value in values.iter() {
table2.insert(value, value).unwrap();
}
});
});
}
write_txn.commit().unwrap();
let end = Instant::now();
let duration = end - start;
println!(
"2 threaded load: {} pairs in {}ms",
2 * ELEMENTS,
duration.as_millis()
);
let read_txn = db.begin_read().unwrap();
let table = read_txn.open_table(TABLE1).unwrap();
assert_eq!(table.len().unwrap(), ELEMENTS);
let table = read_txn.open_table(TABLE2).unwrap();
assert_eq!(table.len().unwrap(), ELEMENTS);
}

fs::remove_dir_all(&tmpdir).unwrap();
}
31 changes: 15 additions & 16 deletions src/multimap_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ use crate::tree_store::{
use crate::types::{RedbKey, RedbValue, TypeName};
use crate::{AccessGuard, Result, WriteTransaction};
use std::borrow::Borrow;
use std::cell::RefCell;
use std::convert::TryInto;
use std::marker::PhantomData;
use std::mem;
use std::mem::size_of;
use std::ops::{RangeBounds, RangeFull};
use std::rc::Rc;
use std::sync::{Arc, Mutex};

pub(crate) fn parse_subtree_roots<T: Page>(
page: &T,
Expand Down Expand Up @@ -226,7 +225,7 @@ impl DynamicCollection {
fn iter_free_on_drop<'a, V: RedbKey>(
collection: AccessGuard<'a, &'static DynamicCollection>,
pages: Vec<PageNumber>,
freed_pages: Rc<RefCell<Vec<PageNumber>>>,
freed_pages: Arc<Mutex<Vec<PageNumber>>>,
mem: &'a TransactionalMemory,
) -> Result<MultimapValueIter<'a, V>> {
Ok(match collection.value().collection_type() {
Expand Down Expand Up @@ -270,7 +269,7 @@ enum ValueIterState<'a, V: RedbKey + 'static> {

pub struct MultimapValueIter<'a, V: RedbKey + 'static> {
inner: Option<ValueIterState<'a, V>>,
freed_pages: Option<Rc<RefCell<Vec<PageNumber>>>>,
freed_pages: Option<Arc<Mutex<Vec<PageNumber>>>>,
free_on_drop: Vec<PageNumber>,
mem: Option<&'a TransactionalMemory>,
_value_type: PhantomData<V>,
Expand All @@ -289,7 +288,7 @@ impl<'a, V: RedbKey + 'static> MultimapValueIter<'a, V> {

fn new_subtree_free_on_drop(
inner: BtreeRangeIter<'a, V, ()>,
freed_pages: Rc<RefCell<Vec<PageNumber>>>,
freed_pages: Arc<Mutex<Vec<PageNumber>>>,
pages: Vec<PageNumber>,
mem: &'a TransactionalMemory,
) -> Self {
Expand Down Expand Up @@ -341,17 +340,17 @@ impl<'a, V: RedbKey + 'static> Drop for MultimapValueIter<'a, V> {
fn drop(&mut self) {
// Drop our references to the pages that are about to be freed
drop(mem::take(&mut self.inner));
for page in self.free_on_drop.iter() {
unsafe {
// Safety: we have a &mut on the transaction
if !self.mem.unwrap().free_if_uncommitted(*page) {
(*self.freed_pages.as_ref().unwrap())
.borrow_mut()
.push(*page);
if !self.free_on_drop.is_empty() {
let mut freed_pages = self.freed_pages.as_ref().unwrap().lock().unwrap();
for page in self.free_on_drop.iter() {
unsafe {
// Safety: we have a &mut on the transaction
if !self.mem.unwrap().free_if_uncommitted(*page) {
freed_pages.push(*page);
}
}
}
}
if !self.free_on_drop.is_empty() {}
}
}

Expand Down Expand Up @@ -412,7 +411,7 @@ impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> DoubleEndedIterator
pub struct MultimapTable<'db, 'txn, K: RedbKey + 'static, V: RedbKey + 'static> {
name: String,
transaction: &'txn WriteTransaction<'db>,
freed_pages: Rc<RefCell<Vec<PageNumber>>>,
freed_pages: Arc<Mutex<Vec<PageNumber>>>,
tree: BtreeMut<'txn, K, &'static DynamicCollection>,
mem: &'db TransactionalMemory,
_value_type: PhantomData<V>,
Expand All @@ -422,7 +421,7 @@ impl<'db, 'txn, K: RedbKey + 'static, V: RedbKey + 'static> MultimapTable<'db, '
pub(crate) fn new(
name: &str,
table_root: Option<(PageNumber, Checksum)>,
freed_pages: Rc<RefCell<Vec<PageNumber>>>,
freed_pages: Arc<Mutex<Vec<PageNumber>>>,
mem: &'db TransactionalMemory,
transaction: &'txn WriteTransaction<'db>,
) -> MultimapTable<'db, 'txn, K, V> {
Expand Down Expand Up @@ -702,7 +701,7 @@ impl<'db, 'txn, K: RedbKey + 'static, V: RedbKey + 'static> MultimapTable<'db, '
drop(page);
unsafe {
if !self.mem.free_if_uncommitted(new_root) {
(*self.freed_pages).borrow_mut().push(new_root);
(*self.freed_pages).lock().unwrap().push(new_root);
}
}
} else {
Expand Down
5 changes: 2 additions & 3 deletions src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ use crate::types::{RedbKey, RedbValue};
use crate::Result;
use crate::{AccessGuard, WriteTransaction};
use std::borrow::Borrow;
use std::cell::RefCell;
use std::ops::RangeBounds;
use std::rc::Rc;
use std::sync::{Arc, Mutex};

/// A table containing key-value mappings
pub struct Table<'db, 'txn, K: RedbKey + 'static, V: RedbValue + 'static> {
Expand All @@ -21,7 +20,7 @@ impl<'db, 'txn, K: RedbKey + 'static, V: RedbValue + 'static> Table<'db, 'txn, K
pub(crate) fn new(
name: &str,
table_root: Option<(PageNumber, Checksum)>,
freed_pages: Rc<RefCell<Vec<PageNumber>>>,
freed_pages: Arc<Mutex<Vec<PageNumber>>>,
mem: &'db TransactionalMemory,
transaction: &'txn WriteTransaction<'db>,
) -> Table<'db, 'txn, K, V> {
Expand Down
Loading

0 comments on commit a9c2b1c

Please sign in to comment.