Skip to content

Commit

Permalink
Use upgradable rwlocks
Browse files Browse the repository at this point in the history
  • Loading branch information
macladson committed Sep 4, 2023
1 parent e148e57 commit 3dc4933
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 29 deletions.
53 changes: 40 additions & 13 deletions src/packed_leaf.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{utils::arb_rwlock, Error, UpdateMap};
use arbitrary::Arbitrary;
use derivative::Derivative;
use parking_lot::RwLock;
use parking_lot::{RwLock, RwLockUpgradableReadGuard};
use std::ops::ControlFlow;
use tree_hash::{Hash256, TreeHash, BYTES_PER_CHUNK};

Expand All @@ -27,27 +27,54 @@ where
}

impl<T: TreeHash + Clone> PackedLeaf<T> {
pub fn tree_hash(&self) -> Hash256 {
let read_lock = self.hash.read();
let mut hash = *read_lock;
drop(read_lock);

if !hash.is_zero() {
return hash;
}

fn compute_hash(&self, mut hash: Hash256) -> Hash256 {
let hash_bytes = hash.as_bytes_mut();

let value_len = BYTES_PER_CHUNK / T::tree_hash_packing_factor();
for (i, value) in self.values.iter().enumerate() {
hash_bytes[i * value_len..(i + 1) * value_len]
.copy_from_slice(&value.tree_hash_packed_encoding());
}

*self.hash.write() = hash;
hash
}

pub fn tree_hash(&self) -> Hash256 {
let read_lock = self.hash.upgradable_read();
let hash = *read_lock;

if !hash.is_zero() {
hash
} else {
match RwLockUpgradableReadGuard::try_upgrade(read_lock) {
Ok(mut write_lock) => {
// If we successfully acquire the lock we are guaranteed to be the first and
// only thread attempting to write the hash.
let tree_hash = self.compute_hash(hash);

*write_lock = tree_hash;
tree_hash
}
Err(lock) => {
// Another thread is holding a lock. Drop the lock and attempt to
// acquire a new one. This will avoid a deadlock.
RwLockUpgradableReadGuard::unlock_fair(lock);
let mut write_lock = self.hash.write();

// Since we just acquired the write lock normally, another thread may have
// just finished computing the hash. If so, return it.
let existing_hash = *write_lock;
if !existing_hash.is_zero() {
return existing_hash;
}

let tree_hash = self.compute_hash(hash);

*write_lock = tree_hash;
tree_hash
}
}
}
}

pub fn empty() -> Self {
PackedLeaf {
hash: RwLock::new(Hash256::zero()),
Expand Down
82 changes: 66 additions & 16 deletions src/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{Arc, Error, Leaf, PackedLeaf, UpdateMap, Value};
use arbitrary::Arbitrary;
use derivative::Derivative;
use ethereum_hashing::{hash32_concat, ZERO_HASHES};
use parking_lot::RwLock;
use parking_lot::{RwLock, RwLockUpgradableReadGuard};
use serde::{Deserialize, Serialize};
use ssz_derive::{Decode, Encode};
use std::collections::BTreeMap;
Expand Down Expand Up @@ -531,10 +531,8 @@ impl<T: Value + Send + Sync> Tree<T> {
pub fn tree_hash(&self) -> Hash256 {
match self {
Self::Leaf(Leaf { hash, value }) => {
// FIXME(sproul): upgradeable RwLock?
let read_lock = hash.read();
let read_lock = hash.upgradable_read();
let existing_hash = *read_lock;
drop(read_lock);

// NOTE: We re-compute the hash whenever it is non-zero. Computed hashes may
// legitimately be zero, but this only occurs at the leaf level when the value is
Expand All @@ -546,28 +544,80 @@ impl<T: Value + Send + Sync> Tree<T> {
if !existing_hash.is_zero() {
existing_hash
} else {
let tree_hash = value.tree_hash_root();
*hash.write() = tree_hash;
tree_hash
match RwLockUpgradableReadGuard::try_upgrade(read_lock) {
Ok(mut write_lock) => {
// If we successfully acquire the lock we are guaranteed to be the first and
// only thread attempting to write the hash.
let tree_hash = value.tree_hash_root();
*write_lock = tree_hash;
tree_hash
}
Err(lock) => {
// Another thread is holding a lock. Drop the lock and attempt to
// acquire a new one. This will avoid a deadlock.
RwLockUpgradableReadGuard::unlock_fair(lock);
let mut write_lock = hash.write();

// Since we just acquired the write lock normally, another thread may have
// just finished computing the hash. If so, return it.
let existing_hash = *write_lock;
if !existing_hash.is_zero() {
return existing_hash;
}

let tree_hash = value.tree_hash_root();
*write_lock = tree_hash;
tree_hash
}
}
}
}
Self::PackedLeaf(leaf) => leaf.tree_hash(),
Self::Zero(depth) => Hash256::from_slice(&ZERO_HASHES[*depth]),
Self::Node { hash, left, right } => {
let read_lock = hash.read();
fn node_tree_hash<T: Value + Send + Sync>(
left: &Arc<Tree<T>>,
right: &Arc<Tree<T>>,
) -> Hash256 {
let (left_hash, right_hash) =
rayon::join(|| left.tree_hash(), || right.tree_hash());
Hash256::from(hash32_concat(left_hash.as_bytes(), right_hash.as_bytes()))
}

let read_lock = hash.upgradable_read();
let existing_hash = *read_lock;
drop(read_lock);

if !existing_hash.is_zero() {
existing_hash
} else {
// Parallelism goes brrrr.
let (left_hash, right_hash) =
rayon::join(|| left.tree_hash(), || right.tree_hash());
let tree_hash =
Hash256::from(hash32_concat(left_hash.as_bytes(), right_hash.as_bytes()));
*hash.write() = tree_hash;
tree_hash
match RwLockUpgradableReadGuard::try_upgrade(read_lock) {
Ok(mut write_lock) => {
// If we successfully acquire the lock we are guaranteed to be the first and
// only thread attempting to write the hash.
let tree_hash = node_tree_hash(left, right);

*write_lock = tree_hash;
tree_hash
}
Err(lock) => {
// Another thread is holding a lock. Drop the lock and attempt to
// acquire a new one. This will avoid a deadlock.
RwLockUpgradableReadGuard::unlock_fair(lock);
let mut write_lock = hash.write();

// Since we just acquired the write lock normally, another thread may have
// just finished computing the hash. If so, return it.
let existing_hash = *write_lock;
if !existing_hash.is_zero() {
return existing_hash;
}

let tree_hash = node_tree_hash(left, right);

*write_lock = tree_hash;
tree_hash
}
}
}
}
}
Expand Down

0 comments on commit 3dc4933

Please sign in to comment.