Skip to content

Commit

Permalink
feat(exex): WAL metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin committed Oct 2, 2024
1 parent d36683d commit a271501
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 35 deletions.
2 changes: 1 addition & 1 deletion crates/exex/exex/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl ExExHandle {

/// Metrics for the `ExEx` manager.
#[derive(Metrics)]
#[metrics(scope = "exex_manager")]
#[metrics(scope = "exex.manager")]
pub struct ExExManagerMetrics {
/// Max size of the internal state notifications buffer.
max_capacity: Gauge,
Expand Down
44 changes: 37 additions & 7 deletions crates/exex/exex/src/wal/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,25 @@ use reth_exex_types::ExExNotification;
#[derive(Debug, Default)]
pub struct BlockCache {
/// A min heap of `(Block Number, File ID)` tuples.
pub(super) blocks: BinaryHeap<Reverse<(BlockNumber, u32)>>,
///
/// Contains one highest block in notification. In a notification with both committed and
/// reverted chain, the highest block is chosen between both chains.
pub(super) notification_max_blocks: BinaryHeap<Reverse<(BlockNumber, u32)>>,
/// A mapping of committed blocks `Block Hash -> Block`.
///
/// For each [`ExExNotification::ChainCommitted`] notification, there will be an entry per
/// block.
pub(super) committed_blocks: FbHashMap<32, (u32, CachedBlock)>,
/// Block height of the lowest committed block currently in the cache.
pub(super) lowest_committed_block_height: Option<BlockNumber>,
/// Block height of the highest committed block currently in the cache.
pub(super) highest_committed_block_height: Option<BlockNumber>,
}

impl BlockCache {
/// Returns `true` if the cache is empty.
pub(super) fn is_empty(&self) -> bool {
self.blocks.is_empty()
self.notification_max_blocks.is_empty()
}

/// Removes all files from the cache that has notifications with a tip block less than or equal
Expand All @@ -37,17 +44,33 @@ impl BlockCache {
pub(super) fn remove_before(&mut self, block_number: BlockNumber) -> HashSet<u32> {
let mut file_ids = HashSet::default();

while let Some(block @ Reverse((max_block, file_id))) = self.blocks.peek().copied() {
while let Some(block @ Reverse((max_block, file_id))) =
self.notification_max_blocks.peek().copied()
{
if max_block <= block_number {
let popped_block = self.blocks.pop().unwrap();
let popped_block = self.notification_max_blocks.pop().unwrap();
debug_assert_eq!(popped_block, block);
file_ids.insert(file_id);
} else {
break
}
}

self.committed_blocks.retain(|_, (file_id, _)| !file_ids.contains(file_id));
let (mut lowest_committed_block_height, mut highest_committed_block_height) = (None, None);
self.committed_blocks.retain(|_, (file_id, block)| {
lowest_committed_block_height = Some(
lowest_committed_block_height
.map_or(block.block.number, |lowest| block.block.number.min(lowest)),
);
highest_committed_block_height = Some(
highest_committed_block_height
.map_or(block.block.number, |highest| block.block.number.max(highest)),
);

!file_ids.contains(file_id)
});
self.lowest_committed_block_height = lowest_committed_block_height;
self.highest_committed_block_height = highest_committed_block_height;

file_ids
}
Expand All @@ -70,7 +93,7 @@ impl BlockCache {
let max_block =
reverted_chain.iter().chain(&committed_chain).map(|chain| chain.tip().number).max();
if let Some(max_block) = max_block {
self.blocks.push(Reverse((max_block, file_id)));
self.notification_max_blocks.push(Reverse((max_block, file_id)));
}

if let Some(committed_chain) = &committed_chain {
Expand All @@ -81,12 +104,19 @@ impl BlockCache {
};
self.committed_blocks.insert(block.hash(), (file_id, cached_block));
}

self.highest_committed_block_height = Some(committed_chain.tip().number);
}
}

#[cfg(test)]
pub(super) fn blocks_sorted(&self) -> Vec<(BlockNumber, u32)> {
self.blocks.clone().into_sorted_vec().into_iter().map(|entry| entry.0).collect()
self.notification_max_blocks
.clone()
.into_sorted_vec()
.into_iter()
.map(|entry| entry.0)
.collect()
}

#[cfg(test)]
Expand Down
18 changes: 18 additions & 0 deletions crates/exex/exex/src/wal/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use metrics::Gauge;
use reth_metrics::Metrics;

/// Metrics for the [WAL](`super::Wal`)
#[derive(Metrics)]
#[metrics(scope = "exex.wal")]
pub(super) struct Metrics {
/// Size of all notifications in WAL in bytes
pub size_bytes: Gauge,
/// Total number of notifications in WAL
pub notifications_total: Gauge,
/// Total number of committed blocks in WAL
pub committed_blocks_total: Gauge,
/// Lowest committed block height in WAL
pub lowest_committed_block_height: Gauge,
/// Highest committed block height in WAL
pub highest_committed_block_height: Gauge,
}
55 changes: 46 additions & 9 deletions crates/exex/exex/src/wal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
mod cache;
pub use cache::BlockCache;
mod storage;
use parking_lot::{RwLock, RwLockReadGuard};
pub use storage::Storage;
mod metrics;
use metrics::Metrics;

use std::{
path::Path,
Expand All @@ -16,6 +17,7 @@ use std::{

use alloy_eips::BlockNumHash;
use alloy_primitives::B256;
use parking_lot::{RwLock, RwLockReadGuard};
use reth_exex_types::ExExNotification;
use reth_tracing::tracing::{debug, instrument};

Expand Down Expand Up @@ -74,6 +76,7 @@ struct WalInner {
storage: Storage,
/// WAL block cache. See [`cache::BlockCache`] docs for more details.
block_cache: RwLock<BlockCache>,
metrics: Metrics,
}

impl WalInner {
Expand All @@ -82,6 +85,7 @@ impl WalInner {
next_file_id: AtomicU32::new(0),
storage: Storage::new(directory)?,
block_cache: RwLock::new(BlockCache::default()),
metrics: Metrics::default(),
};
wal.fill_block_cache()?;
Ok(wal)
Expand All @@ -98,9 +102,12 @@ impl WalInner {
self.next_file_id.store(files_range.end() + 1, Ordering::Relaxed);

let mut block_cache = self.block_cache.write();
let mut notifications_size = 0;

for entry in self.storage.iter_notifications(files_range) {
let (file_id, notification) = entry?;
let (file_id, size, notification) = entry?;

notifications_size += size;

let committed_chain = notification.committed_chain();
let reverted_chain = notification.reverted_chain();
Expand All @@ -116,6 +123,8 @@ impl WalInner {
block_cache.insert_notification_blocks_with_file_id(file_id, &notification);
}

self.update_metrics(&block_cache, notifications_size as i64);

Ok(())
}

Expand All @@ -127,30 +136,55 @@ impl WalInner {
let mut block_cache = self.block_cache.write();

let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed);
self.storage.write_notification(file_id, notification)?;
let size = self.storage.write_notification(file_id, notification)?;

debug!(?file_id, "Inserting notification blocks into the block cache");
block_cache.insert_notification_blocks_with_file_id(file_id, notification);

self.update_metrics(&block_cache, size as i64);

Ok(())
}

#[instrument(target = "exex::wal", skip(self))]
fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> {
let file_ids = self.block_cache.write().remove_before(to_block.number);
let mut block_cache = self.block_cache.write();
let file_ids = block_cache.remove_before(to_block.number);

// Remove notifications from the storage.
if file_ids.is_empty() {
debug!("No notifications were finalized from the storage");
return Ok(())
}

let removed_notifications = self.storage.remove_notifications(file_ids)?;
debug!(?removed_notifications, "Storage was finalized");
let (removed_notifications, removed_size) = self.storage.remove_notifications(file_ids)?;
debug!(?removed_notifications, ?removed_size, "Storage was finalized");

self.update_metrics(&block_cache, -(removed_size as i64));

Ok(())
}

fn update_metrics(&self, block_cache: &BlockCache, size_delta: i64) {
if size_delta >= 0 {
self.metrics.size_bytes.increment(size_delta as f64);
} else {
self.metrics.size_bytes.decrement(size_delta as f64);
}

self.metrics.notifications_total.set(block_cache.notification_max_blocks.len() as f64);

self.metrics.committed_blocks_total.set(block_cache.committed_blocks.len() as f64);

if let Some(lowest_committed_block_height) = block_cache.lowest_committed_block_height {
self.metrics.lowest_committed_block_height.set(lowest_committed_block_height as f64);
}

if let Some(highest_committed_block_height) = block_cache.highest_committed_block_height {
self.metrics.highest_committed_block_height.set(highest_committed_block_height as f64);
}
}

/// Returns an iterator over all notifications in the WAL.
fn iter_notifications(
&self,
Expand All @@ -159,7 +193,7 @@ impl WalInner {
return Ok(Box::new(std::iter::empty()))
};

Ok(Box::new(self.storage.iter_notifications(range).map(|entry| Ok(entry?.1))))
Ok(Box::new(self.storage.iter_notifications(range).map(|entry| Ok(entry?.2))))
}
}

Expand All @@ -180,7 +214,10 @@ impl WalHandle {
return Ok(None)
};

self.wal.storage.read_notification(file_id)
self.wal
.storage
.read_notification(file_id)
.map(|entry| entry.map(|(notification, _)| notification))
}
}

Expand All @@ -205,7 +242,7 @@ mod tests {
wal.inner
.storage
.iter_notifications(files_range)
.map(|entry| Ok(entry?.1))
.map(|entry| Ok(entry?.2))
.collect::<eyre::Result<_>>()
}

Expand Down
Loading

0 comments on commit a271501

Please sign in to comment.