Skip to content

Commit

Permalink
feat(exex): WAL metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin committed Oct 2, 2024
1 parent d36683d commit f8c12ff
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 34 deletions.
2 changes: 1 addition & 1 deletion crates/exex/exex/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl ExExHandle {

/// Metrics for the `ExEx` manager.
#[derive(Metrics)]
#[metrics(scope = "exex_manager")]
#[metrics(scope = "exex.manager")]
pub struct ExExManagerMetrics {
/// Max size of the internal state notifications buffer.
max_capacity: Gauge,
Expand Down
44 changes: 37 additions & 7 deletions crates/exex/exex/src/wal/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,25 @@ use reth_exex_types::ExExNotification;
#[derive(Debug, Default)]
pub struct BlockCache {
/// A min heap of `(Block Number, File ID)` tuples.
pub(super) blocks: BinaryHeap<Reverse<(BlockNumber, u32)>>,
///
/// Contains one highest block in notification. In a notification with both committed and
/// reverted chain, the highest block is chosen between both chains.
pub(super) notification_max_blocks: BinaryHeap<Reverse<(BlockNumber, u32)>>,
/// A mapping of committed blocks `Block Hash -> Block`.
///
/// For each [`ExExNotification::ChainCommitted`] notification, there will be an entry per
/// block.
pub(super) committed_blocks: FbHashMap<32, (u32, CachedBlock)>,
/// Block height of the lowest committed block currently in the cache.
pub(super) lowest_committed_block_height: Option<BlockNumber>,
/// Block height of the highest committed block currently in the cache.
pub(super) highest_committed_block_height: Option<BlockNumber>,
}

impl BlockCache {
/// Returns `true` if the cache is empty.
pub(super) fn is_empty(&self) -> bool {
self.blocks.is_empty()
self.notification_max_blocks.is_empty()
}

/// Removes all files from the cache that has notifications with a tip block less than or equal
Expand All @@ -37,17 +44,33 @@ impl BlockCache {
pub(super) fn remove_before(&mut self, block_number: BlockNumber) -> HashSet<u32> {
let mut file_ids = HashSet::default();

while let Some(block @ Reverse((max_block, file_id))) = self.blocks.peek().copied() {
while let Some(block @ Reverse((max_block, file_id))) =
self.notification_max_blocks.peek().copied()
{
if max_block <= block_number {
let popped_block = self.blocks.pop().unwrap();
let popped_block = self.notification_max_blocks.pop().unwrap();
debug_assert_eq!(popped_block, block);
file_ids.insert(file_id);
} else {
break
}
}

self.committed_blocks.retain(|_, (file_id, _)| !file_ids.contains(file_id));
let (mut lowest_committed_block_height, mut highest_committed_block_height) = (None, None);
self.committed_blocks.retain(|_, (file_id, block)| {
lowest_committed_block_height = Some(
lowest_committed_block_height
.map_or(block.block.number, |lowest| block.block.number.min(lowest)),
);
highest_committed_block_height = Some(
highest_committed_block_height
.map_or(block.block.number, |highest| block.block.number.max(highest)),
);

!file_ids.contains(file_id)
});
self.lowest_committed_block_height = lowest_committed_block_height;
self.highest_committed_block_height = highest_committed_block_height;

file_ids
}
Expand All @@ -70,7 +93,7 @@ impl BlockCache {
let max_block =
reverted_chain.iter().chain(&committed_chain).map(|chain| chain.tip().number).max();
if let Some(max_block) = max_block {
self.blocks.push(Reverse((max_block, file_id)));
self.notification_max_blocks.push(Reverse((max_block, file_id)));
}

if let Some(committed_chain) = &committed_chain {
Expand All @@ -81,12 +104,19 @@ impl BlockCache {
};
self.committed_blocks.insert(block.hash(), (file_id, cached_block));
}

self.highest_committed_block_height = Some(committed_chain.tip().number);
}
}

#[cfg(test)]
pub(super) fn blocks_sorted(&self) -> Vec<(BlockNumber, u32)> {
self.blocks.clone().into_sorted_vec().into_iter().map(|entry| entry.0).collect()
self.notification_max_blocks
.clone()
.into_sorted_vec()
.into_iter()
.map(|entry| entry.0)
.collect()
}

#[cfg(test)]
Expand Down
18 changes: 18 additions & 0 deletions crates/exex/exex/src/wal/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use metrics::Gauge;
use reth_metrics::Metrics;

/// Metrics for the [WAL](`super::Wal`)
#[derive(Metrics)]
#[metrics(scope = "exex.wal")]
pub struct Metrics {
/// Size of all notifications in WAL in bytes
pub size_bytes: Gauge,
/// Total number of notifications in WAL
pub notifications_total: Gauge,
/// Total number of committed blocks in WAL
pub committed_blocks_total: Gauge,
/// Lowest committed block height in WAL
pub lowest_committed_block_height: Gauge,
/// Highest committed block height in WAL
pub highest_committed_block_height: Gauge,
}
55 changes: 46 additions & 9 deletions crates/exex/exex/src/wal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
mod cache;
pub use cache::BlockCache;
mod storage;
use parking_lot::{RwLock, RwLockReadGuard};
pub use storage::Storage;
mod metrics;
pub use metrics::Metrics;

use std::{
path::Path,
Expand All @@ -16,6 +17,7 @@ use std::{

use alloy_eips::BlockNumHash;
use alloy_primitives::B256;
use parking_lot::{RwLock, RwLockReadGuard};
use reth_exex_types::ExExNotification;
use reth_tracing::tracing::{debug, instrument};

Expand Down Expand Up @@ -74,6 +76,7 @@ struct WalInner {
storage: Storage,
/// WAL block cache. See [`cache::BlockCache`] docs for more details.
block_cache: RwLock<BlockCache>,
metrics: Metrics,
}

impl WalInner {
Expand All @@ -82,6 +85,7 @@ impl WalInner {
next_file_id: AtomicU32::new(0),
storage: Storage::new(directory)?,
block_cache: RwLock::new(BlockCache::default()),
metrics: Metrics::default(),
};
wal.fill_block_cache()?;
Ok(wal)
Expand All @@ -98,9 +102,12 @@ impl WalInner {
self.next_file_id.store(files_range.end() + 1, Ordering::Relaxed);

let mut block_cache = self.block_cache.write();
let mut notifications_size = 0;

for entry in self.storage.iter_notifications(files_range) {
let (file_id, notification) = entry?;
let (file_id, size, notification) = entry?;

notifications_size += size;

let committed_chain = notification.committed_chain();
let reverted_chain = notification.reverted_chain();
Expand All @@ -116,6 +123,8 @@ impl WalInner {
block_cache.insert_notification_blocks_with_file_id(file_id, &notification);
}

self.update_metrics(&block_cache, notifications_size as i64);

Ok(())
}

Expand All @@ -127,30 +136,55 @@ impl WalInner {
let mut block_cache = self.block_cache.write();

let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed);
self.storage.write_notification(file_id, notification)?;
let size = self.storage.write_notification(file_id, notification)?;

debug!(?file_id, "Inserting notification blocks into the block cache");
block_cache.insert_notification_blocks_with_file_id(file_id, notification);

self.update_metrics(&block_cache, size as i64);

Ok(())
}

#[instrument(target = "exex::wal", skip(self))]
fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> {
let file_ids = self.block_cache.write().remove_before(to_block.number);
let mut block_cache = self.block_cache.write();
let file_ids = block_cache.remove_before(to_block.number);

// Remove notifications from the storage.
if file_ids.is_empty() {
debug!("No notifications were finalized from the storage");
return Ok(())
}

let removed_notifications = self.storage.remove_notifications(file_ids)?;
debug!(?removed_notifications, "Storage was finalized");
let (removed_notifications, removed_size) = self.storage.remove_notifications(file_ids)?;
debug!(?removed_notifications, ?removed_size, "Storage was finalized");

self.update_metrics(&block_cache, -(removed_size as i64));

Ok(())
}

fn update_metrics(&self, block_cache: &BlockCache, size_delta: i64) {
if size_delta >= 0 {
self.metrics.size_bytes.increment(size_delta as f64);
} else {
self.metrics.size_bytes.decrement(size_delta as f64);
}

self.metrics.notifications_total.set(block_cache.notification_max_blocks.len() as f64);

self.metrics.committed_blocks_total.set(block_cache.committed_blocks.len() as f64);

if let Some(lowest_committed_block_height) = block_cache.lowest_committed_block_height {
self.metrics.lowest_committed_block_height.set(lowest_committed_block_height as f64);
}

if let Some(highest_committed_block_height) = block_cache.highest_committed_block_height {
self.metrics.highest_committed_block_height.set(highest_committed_block_height as f64);
}
}

/// Returns an iterator over all notifications in the WAL.
fn iter_notifications(
&self,
Expand All @@ -159,7 +193,7 @@ impl WalInner {
return Ok(Box::new(std::iter::empty()))
};

Ok(Box::new(self.storage.iter_notifications(range).map(|entry| Ok(entry?.1))))
Ok(Box::new(self.storage.iter_notifications(range).map(|entry| Ok(entry?.2))))
}
}

Expand All @@ -180,7 +214,10 @@ impl WalHandle {
return Ok(None)
};

self.wal.storage.read_notification(file_id)
self.wal
.storage
.read_notification(file_id)
.map(|entry| entry.map(|(notification, _)| notification))
}
}

Expand All @@ -205,7 +242,7 @@ mod tests {
wal.inner
.storage
.iter_notifications(files_range)
.map(|entry| Ok(entry?.1))
.map(|entry| Ok(entry?.2))
.collect::<eyre::Result<_>>()
}

Expand Down
54 changes: 37 additions & 17 deletions crates/exex/exex/src/wal/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,23 @@ impl Storage {
}

/// Removes notification for the given file ID from the storage.
///
/// # Returns
///
/// The size of the file that was removed in bytes, if any.
#[instrument(target = "exex::wal::storage", skip(self))]
fn remove_notification(&self, file_id: u32) -> bool {
fn remove_notification(&self, file_id: u32) -> Option<u64> {
let path = self.file_path(file_id);
let size = path.metadata().ok()?.len();

match reth_fs_util::remove_file(self.file_path(file_id)) {
Ok(()) => {
debug!("Notification was removed from the storage");
true
Some(size)
}
Err(err) => {
debug!(?err, "Failed to remove notification from the storage");
false
None
}
}
}
Expand Down Expand Up @@ -77,36 +84,42 @@ impl Storage {
///
/// # Returns
///
/// Number of removed notifications.
/// Number of removed notifications and the total size of the removed files in bytes.
pub(super) fn remove_notifications(
&self,
file_ids: impl IntoIterator<Item = u32>,
) -> eyre::Result<usize> {
let mut deleted = 0;
) -> eyre::Result<(usize, u64)> {
let mut deleted_total = 0;
let mut deleted_size = 0;

for id in file_ids {
if self.remove_notification(id) {
deleted += 1;
if let Some(size) = self.remove_notification(id) {
deleted_total += 1;
deleted_size += size;
}
}

Ok(deleted)
Ok((deleted_total, deleted_size))
}

pub(super) fn iter_notifications(
&self,
range: RangeInclusive<u32>,
) -> impl Iterator<Item = eyre::Result<(u32, ExExNotification)>> + '_ {
) -> impl Iterator<Item = eyre::Result<(u32, u64, ExExNotification)>> + '_ {
range.map(move |id| {
let notification = self.read_notification(id)?.ok_or_eyre("notification not found")?;
let (notification, size) =
self.read_notification(id)?.ok_or_eyre("notification not found")?;

Ok((id, notification))
Ok((id, size, notification))
})
}

/// Reads the notification from the file with the given ID.
#[instrument(target = "exex::wal::storage", skip(self))]
pub(super) fn read_notification(&self, file_id: u32) -> eyre::Result<Option<ExExNotification>> {
pub(super) fn read_notification(
&self,
file_id: u32,
) -> eyre::Result<Option<(ExExNotification, u64)>> {
let file_path = self.file_path(file_id);
debug!(?file_path, "Reading notification from WAL");

Expand All @@ -115,31 +128,38 @@ impl Storage {
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(err) => return Err(err.into()),
};
let size = file.metadata()?.len();

// Deserialize using the bincode- and msgpack-compatible serde wrapper
let notification: reth_exex_types::serde_bincode_compat::ExExNotification<'_> =
rmp_serde::decode::from_read(&mut file)?;

Ok(Some(notification.into()))
Ok(Some((notification.into(), size)))
}

/// Writes the notification to the file with the given ID.
///
/// # Returns
///
/// The size of the file that was written in bytes.
#[instrument(target = "exex::wal::storage", skip(self, notification))]
pub(super) fn write_notification(
&self,
file_id: u32,
notification: &ExExNotification,
) -> eyre::Result<()> {
) -> eyre::Result<u64> {
let file_path = self.file_path(file_id);
debug!(?file_path, "Writing notification to WAL");

// Serialize using the bincode- and msgpack-compatible serde wrapper
let notification =
reth_exex_types::serde_bincode_compat::ExExNotification::from(notification);

Ok(reth_fs_util::atomic_write_file(&file_path, |file| {
reth_fs_util::atomic_write_file(&file_path, |file| {
rmp_serde::encode::write(file, &notification)
})?)
})?;

Ok(file_path.metadata()?.len())
}
}

Expand Down

0 comments on commit f8c12ff

Please sign in to comment.