Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Perf] Add cache for most recent blocks #3440

Open
wants to merge 3 commits into
base: staging
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 48 additions & 3 deletions node/bft/ledger-service/src/ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use indexmap::IndexMap;
use lru::LruCache;
use parking_lot::{Mutex, RwLock};
use std::{
collections::{BTreeMap, BTreeSet},
fmt,
io::Read,
ops::Range,
Expand All @@ -41,12 +42,15 @@ use std::{

/// The capacity of the LRU holding the recently queried committees.
const COMMITTEE_CACHE_SIZE: usize = 16;
/// The capacity of the cache holding the highest blocks.
const BLOCK_CACHE_SIZE: usize = 10;
vicsn marked this conversation as resolved.
Show resolved Hide resolved

/// A core ledger service.
#[allow(clippy::type_complexity)]
pub struct CoreLedgerService<N: Network, C: ConsensusStorage<N>> {
ledger: Ledger<N, C>,
committee_cache: Arc<Mutex<LruCache<u64, Committee<N>>>>,
block_cache: Arc<RwLock<BTreeMap<u32, Block<N>>>>,
latest_leader: Arc<RwLock<Option<(u64, Address<N>)>>>,
shutdown: Arc<AtomicBool>,
}
Expand All @@ -55,7 +59,8 @@ impl<N: Network, C: ConsensusStorage<N>> CoreLedgerService<N, C> {
/// Initializes a new core ledger service.
pub fn new(ledger: Ledger<N, C>, shutdown: Arc<AtomicBool>) -> Self {
let committee_cache = Arc::new(Mutex::new(LruCache::new(COMMITTEE_CACHE_SIZE.try_into().unwrap())));
Self { ledger, committee_cache, latest_leader: Default::default(), shutdown }
let block_cache = Arc::new(RwLock::new(BTreeMap::new()));
Self { ledger, committee_cache, block_cache, latest_leader: Default::default(), shutdown }
}
}

Expand Down Expand Up @@ -120,13 +125,47 @@ impl<N: Network, C: ConsensusStorage<N>> LedgerService<N> for CoreLedgerService<

/// Returns the block for the given block height.
fn get_block(&self, height: u32) -> Result<Block<N>> {
self.ledger.get_block(height)
if let Some(block) = self.block_cache.read().get(&height) {
Ok(block.clone())
} else {
self.ledger.get_block(height)
}
}

/// Returns the blocks in the given block range.
/// The range is inclusive of the start and exclusive of the end.
fn get_blocks(&self, heights: Range<u32>) -> Result<Vec<Block<N>>> {
self.ledger.get_blocks(heights)
// Prepare blocks to collect.
let mut blocks = Vec::with_capacity(heights.len());
// Determine the lowest height.
let start_height = heights.start;
// Prepare missing heights to collect.
let mut missing_heights = BTreeSet::from_iter(heights.clone());
// For each height in the range, check if the block is in the cache.
for height in heights {
if let Some(block) = self.block_cache.read().get(&height) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would probably be more perf-friendly to keep a read guard for the entire duration of this loop

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it could also be used to quickly skip the attempt to obtain blocks from the cache in case its 1st and last entries are outside of the requested range

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would probably be more perf-friendly to keep a read guard for the entire duration of this loop

Great idea: bb00286

it could also be used to quickly skip the attempt to obtain blocks from the cache in case its 1st and last entries are outside of the requested range

Interesting idea, though I'd rather keep it as is and make the happy path (which matters for honest validators) fast and readable.

let index = block.height().saturating_sub(start_height) as usize;
blocks.insert(index, block.clone());
missing_heights.remove(&height);
}
}
// Check if there are any blocks not found in the cache.
let missing_heights = match (missing_heights.first(), missing_heights.last()) {
// All blocks found in the cache.
(None, None) => return Ok(blocks),
// Some blocks found in the cache.
(Some(&start), Some(&end)) => start..end + 1,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
(Some(&start), Some(&end)) => start..end + 1,
(Some(&start), Some(&end)) => start..=end,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error[E0308]: mismatched types
   --> node/bft/ledger-service/src/ledger.rs:167:45
    |
167 |         for block in self.ledger.get_blocks(missing_heights)? {
    |                                  ---------- ^^^^^^^^^^^^^^^ expected `Range<u32>`, found `RangeInclusive<u32>`
    |                                  |
    |                                  arguments to this method are incorrect

// Something went wrong.
_ => bail!("Missing heights has an unexpected block range."),
};

// Retrieve the missing blocks from the ledger.
for block in self.ledger.get_blocks(missing_heights)? {
let index = block.height().saturating_sub(start_height) as usize;
blocks.insert(index, block);
}

Ok(blocks)
}

/// Returns the solution for the given solution ID.
Expand Down Expand Up @@ -365,6 +404,12 @@ impl<N: Network, C: ConsensusStorage<N>> LedgerService<N> for CoreLedgerService<
}
// Advance to the next block.
self.ledger.advance_to_next_block(block)?;
// Add the block to the block cache.
self.block_cache.write().insert(block.height(), block.clone());
// Prune the block cache if it exceeds the maximum size.
if self.block_cache.read().len() > BLOCK_CACHE_SIZE {
self.block_cache.write().pop_first();
}
// Update BFT metrics.
#[cfg(feature = "metrics")]
{
Expand Down