From da23e1ce8dd6f5a5efb083ce1adfb7b2f2290667 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Wed, 2 Oct 2024 14:23:02 +0200 Subject: [PATCH 1/3] chore(provider): rename to `get_in_memory_or_storage_by_block_range_while` (#11421) --- .../src/providers/blockchain_provider.rs | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs index 100646b0ed21..c329b593ea31 100644 --- a/crates/storage/provider/src/providers/blockchain_provider.rs +++ b/crates/storage/provider/src/providers/blockchain_provider.rs @@ -122,7 +122,8 @@ impl BlockchainProvider2 { (start, end) } - /// Fetches a range of data from both in-memory state and storage while a predicate is met. + /// Fetches a range of data from both in-memory state and persistent storage while a predicate + /// is met. /// /// Creates a snapshot of the in-memory chain state and database provider to prevent /// inconsistencies. Splits the range into in-memory and storage sections, prioritizing @@ -132,7 +133,7 @@ impl BlockchainProvider2 { /// user to retrieve the required items from the database using [`RangeInclusive`]. /// * `map_block_state_item` function (`G`) provides each block of the range in the in-memory /// state, allowing for selection or filtering for the desired data. - fn fetch_db_mem_range_while( + fn get_in_memory_or_storage_by_block_range_while( &self, range: impl RangeBounds, fetch_db_range: F, @@ -252,7 +253,8 @@ impl BlockchainProvider2 { S: FnOnce(DatabaseProviderRO) -> ProviderResult>, M: Fn(usize, TxNumber, Arc) -> ProviderResult>, { - // Order of instantiation matters. More information on: `fetch_db_mem_range_while`. + // Order of instantiation matters. More information on: + // `get_in_memory_or_storage_by_block_range_while`. let in_mem_chain = self.canonical_in_memory_state.canonical_chain().collect::>(); let provider = self.database.provider()?; @@ -426,7 +428,7 @@ impl HeaderProvider for BlockchainProvider2 { } fn headers_range(&self, range: impl RangeBounds) -> ProviderResult> { - self.fetch_db_mem_range_while( + self.get_in_memory_or_storage_by_block_range_while( range, |db_provider, range, _| db_provider.headers_range(range), |block_state, _| Some(block_state.block().block().header.header().clone()), @@ -446,7 +448,7 @@ impl HeaderProvider for BlockchainProvider2 { &self, range: impl RangeBounds, ) -> ProviderResult> { - self.fetch_db_mem_range_while( + self.get_in_memory_or_storage_by_block_range_while( range, |db_provider, range, _| db_provider.sealed_headers_range(range), |block_state, _| Some(block_state.block().block().header.clone()), @@ -459,7 +461,7 @@ impl HeaderProvider for BlockchainProvider2 { range: impl RangeBounds, predicate: impl FnMut(&SealedHeader) -> bool, ) -> ProviderResult> { - self.fetch_db_mem_range_while( + self.get_in_memory_or_storage_by_block_range_while( range, |db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate), |block_state, predicate| { @@ -484,7 +486,7 @@ impl BlockHashReader for BlockchainProvider2 { start: BlockNumber, end: BlockNumber, ) -> ProviderResult> { - self.fetch_db_mem_range_while( + self.get_in_memory_or_storage_by_block_range_while( start..end, |db_provider, inclusive_range, _| { db_provider @@ -654,7 +656,7 @@ impl BlockReader for BlockchainProvider2 { } fn block_range(&self, range: RangeInclusive) -> ProviderResult> { - self.fetch_db_mem_range_while( + self.get_in_memory_or_storage_by_block_range_while( range, |db_provider, range, _| db_provider.block_range(range), |block_state, _| Some(block_state.block().block().clone().unseal()), @@ -666,7 +668,7 @@ impl BlockReader for BlockchainProvider2 { &self, range: RangeInclusive, ) -> ProviderResult> { - self.fetch_db_mem_range_while( + self.get_in_memory_or_storage_by_block_range_while( range, |db_provider, range, _| db_provider.block_with_senders_range(range), |block_state, _| Some(block_state.block_with_senders()), @@ -678,7 +680,7 @@ impl BlockReader for BlockchainProvider2 { &self, range: RangeInclusive, ) -> ProviderResult> { - self.fetch_db_mem_range_while( + self.get_in_memory_or_storage_by_block_range_while( range, |db_provider, range, _| db_provider.sealed_block_with_senders_range(range), |block_state, _| Some(block_state.sealed_block_with_senders()), From d36683d1a32559bb167c8f91bfc640f29bc3af31 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 2 Oct 2024 14:39:19 +0200 Subject: [PATCH 2/3] chore(sdk): define traits for primitives `Block` and `BlockBody` (#11411) --- crates/primitives/src/traits/block/body.rs | 152 +++++++++++++++++++++ crates/primitives/src/traits/block/mod.rs | 137 +++++++++++++++++++ crates/primitives/src/traits/mod.rs | 7 + 3 files changed, 296 insertions(+) create mode 100644 crates/primitives/src/traits/block/body.rs create mode 100644 crates/primitives/src/traits/block/mod.rs create mode 100644 crates/primitives/src/traits/mod.rs diff --git a/crates/primitives/src/traits/block/body.rs b/crates/primitives/src/traits/block/body.rs new file mode 100644 index 000000000000..ff8f71b76162 --- /dev/null +++ b/crates/primitives/src/traits/block/body.rs @@ -0,0 +1,152 @@ +//! Block body abstraction. + +use alloc::fmt; +use core::ops; + +use alloy_consensus::{BlockHeader, Transaction, TxType}; +use alloy_primitives::{Address, B256}; + +use crate::{proofs, traits::Block, Requests, Withdrawals}; + +/// Abstraction for block's body. +pub trait BlockBody: + Clone + + fmt::Debug + + PartialEq + + Eq + + Default + + serde::Serialize + + for<'de> serde::Deserialize<'de> + + alloy_rlp::Encodable + + alloy_rlp::Decodable +{ + /// Ordered list of signed transactions as committed in block. + // todo: requires trait for signed transaction + type SignedTransaction: Transaction; + + /// Header type (uncle blocks). + type Header: BlockHeader; + + /// Returns reference to transactions in block. + fn transactions(&self) -> &[Self::SignedTransaction]; + + /// Returns [`Withdrawals`] in the block, if any. + // todo: branch out into extension trait + fn withdrawals(&self) -> Option<&Withdrawals>; + + /// Returns reference to uncle block headers. + fn ommers(&self) -> &[Self::Header]; + + /// Returns [`Request`] in block, if any. + fn requests(&self) -> Option<&Requests>; + + /// Create a [`Block`] from the body and its header. + fn into_block>(self, header: Self::Header) -> T { + T::from((header, self)) + } + + /// Calculate the transaction root for the block body. + fn calculate_tx_root(&self) -> B256; + + /// Calculate the ommers root for the block body. + fn calculate_ommers_root(&self) -> B256; + + /// Calculate the withdrawals root for the block body, if withdrawals exist. If there are no + /// withdrawals, this will return `None`. + fn calculate_withdrawals_root(&self) -> Option { + Some(proofs::calculate_withdrawals_root(self.withdrawals()?)) + } + + /// Calculate the requests root for the block body, if requests exist. If there are no + /// requests, this will return `None`. + fn calculate_requests_root(&self) -> Option { + Some(proofs::calculate_requests_root(self.requests()?)) + } + + /// Recover signer addresses for all transactions in the block body. + fn recover_signers(&self) -> Option>; + + /// Returns whether or not the block body contains any blob transactions. + fn has_blob_transactions(&self) -> bool { + self.transactions().iter().any(|tx| tx.ty() as u8 == TxType::Eip4844 as u8) + } + + /// Returns whether or not the block body contains any EIP-7702 transactions. + fn has_eip7702_transactions(&self) -> bool { + self.transactions().iter().any(|tx| tx.ty() as u8 == TxType::Eip7702 as u8) + } + + /// Returns an iterator over all blob transactions of the block + fn blob_transactions_iter(&self) -> impl Iterator + '_ { + self.transactions().iter().filter(|tx| tx.ty() as u8 == TxType::Eip4844 as u8) + } + + /// Returns only the blob transactions, if any, from the block body. + fn blob_transactions(&self) -> Vec<&Self::SignedTransaction> { + self.blob_transactions_iter().collect() + } + + /// Returns an iterator over all blob versioned hashes from the block body. + fn blob_versioned_hashes_iter(&self) -> impl Iterator + '_; + + /// Returns all blob versioned hashes from the block body. + fn blob_versioned_hashes(&self) -> Vec<&B256> { + self.blob_versioned_hashes_iter().collect() + } + + /// Calculates a heuristic for the in-memory size of the [`BlockBody`]. + fn size(&self) -> usize; +} + +impl BlockBody for T +where + T: ops::Deref + + Clone + + fmt::Debug + + PartialEq + + Eq + + Default + + serde::Serialize + + for<'de> serde::Deserialize<'de> + + alloy_rlp::Encodable + + alloy_rlp::Decodable, +{ + type Header = ::Header; + type SignedTransaction = ::SignedTransaction; + + fn transactions(&self) -> &Vec { + self.deref().transactions() + } + + fn withdrawals(&self) -> Option<&Withdrawals> { + self.deref().withdrawals() + } + + fn ommers(&self) -> &Vec { + self.deref().ommers() + } + + fn requests(&self) -> Option<&Requests> { + self.deref().requests() + } + + fn calculate_tx_root(&self) -> B256 { + self.deref().calculate_tx_root() + } + + fn calculate_ommers_root(&self) -> B256 { + self.deref().calculate_ommers_root() + } + + fn recover_signers(&self) -> Option> { + self.deref().recover_signers() + } + + fn blob_versioned_hashes_iter(&self) -> impl Iterator + '_ { + self.deref().blob_versioned_hashes_iter() + } + + fn size(&self) -> usize { + self.deref().size() + } +} diff --git a/crates/primitives/src/traits/block/mod.rs b/crates/primitives/src/traits/block/mod.rs new file mode 100644 index 000000000000..451a54c3457c --- /dev/null +++ b/crates/primitives/src/traits/block/mod.rs @@ -0,0 +1,137 @@ +//! Block abstraction. + +pub mod body; + +use alloc::fmt; +use core::ops; + +use alloy_consensus::BlockHeader; +use alloy_primitives::{Address, Sealable, B256}; + +use crate::{traits::BlockBody, BlockWithSenders, SealedBlock, SealedHeader}; + +/// Abstraction of block data type. +pub trait Block: + fmt::Debug + + Clone + + PartialEq + + Eq + + Default + + serde::Serialize + + for<'a> serde::Deserialize<'a> + + From<(Self::Header, Self::Body)> + + Into<(Self::Header, Self::Body)> +{ + /// Header part of the block. + type Header: BlockHeader + Sealable; + + /// The block's body contains the transactions in the block. + type Body: BlockBody; + + /// Returns reference to [`BlockHeader`] type. + fn header(&self) -> &Self::Header; + + /// Returns reference to [`BlockBody`] type. + fn body(&self) -> &Self::Body; + + /// Calculate the header hash and seal the block so that it can't be changed. + fn seal_slow(self) -> SealedBlock { + let (header, body) = self.into(); + let sealed = header.seal_slow(); + let (header, seal) = sealed.into_parts(); + SealedBlock { header: SealedHeader::new(header, seal), body } + } + + /// Seal the block with a known hash. + /// + /// WARNING: This method does not perform validation whether the hash is correct. + fn seal(self, hash: B256) -> SealedBlock { + let (header, body) = self.into(); + SealedBlock { header: SealedHeader::new(header, hash), body } + } + + /// Expensive operation that recovers transaction signer. See + /// [`SealedBlockWithSenders`](reth_primitives::SealedBlockWithSenders). + fn senders(&self) -> Option> { + self.body().recover_signers() + } + + /// Transform into a [`BlockWithSenders`]. + /// + /// # Panics + /// + /// If the number of senders does not match the number of transactions in the block + /// and the signer recovery for one of the transactions fails. + /// + /// Note: this is expected to be called with blocks read from disk. + #[track_caller] + fn with_senders_unchecked(self, senders: Vec
) -> BlockWithSenders { + self.try_with_senders_unchecked(senders).expect("stored block is valid") + } + + /// Transform into a [`BlockWithSenders`] using the given senders. + /// + /// If the number of senders does not match the number of transactions in the block, this falls + /// back to manually recovery, but _without ensuring that the signature has a low `s` value_. + /// See also [`TransactionSigned::recover_signer_unchecked`] + /// + /// Returns an error if a signature is invalid. + #[track_caller] + fn try_with_senders_unchecked( + self, + senders: Vec
, + ) -> Result, Self> { + let senders = if self.body().transactions().len() == senders.len() { + senders + } else { + let Some(senders) = self.body().recover_signers() else { return Err(self) }; + senders + }; + + Ok(BlockWithSenders { block: self, senders }) + } + + /// **Expensive**. Transform into a [`BlockWithSenders`] by recovering senders in the contained + /// transactions. + /// + /// Returns `None` if a transaction is invalid. + fn with_recovered_senders(self) -> Option> { + let senders = self.senders()?; + Some(BlockWithSenders { block: self, senders }) + } + + /// Calculates a heuristic for the in-memory size of the [`Block`]. + fn size(&self) -> usize; +} + +impl Block for T +where + T: ops::Deref + + fmt::Debug + + Clone + + PartialEq + + Eq + + Default + + serde::Serialize + + for<'a> serde::Deserialize<'a> + + From<(::Header, ::Body)> + + Into<(::Header, ::Body)>, +{ + type Header = ::Header; + type Body = ::Body; + + #[inline] + fn header(&self) -> &Self::Header { + self.deref().header() + } + + #[inline] + fn body(&self) -> &Self::Body { + self.deref().body() + } + + #[inline] + fn size(&self) -> usize { + self.deref().size() + } +} diff --git a/crates/primitives/src/traits/mod.rs b/crates/primitives/src/traits/mod.rs new file mode 100644 index 000000000000..8c84c6729753 --- /dev/null +++ b/crates/primitives/src/traits/mod.rs @@ -0,0 +1,7 @@ +//! Abstractions of primitive data types + +pub mod block; + +pub use block::{body::BlockBody, Block}; + +pub use alloy_consensus::BlockHeader; From a27150113efda9b1cae7d5260a730a3fae6f6f78 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 2 Oct 2024 16:47:05 +0300 Subject: [PATCH 3/3] feat(exex): WAL metrics --- crates/exex/exex/src/manager.rs | 2 +- crates/exex/exex/src/wal/cache.rs | 44 +++++++++++++++++---- crates/exex/exex/src/wal/metrics.rs | 18 +++++++++ crates/exex/exex/src/wal/mod.rs | 55 ++++++++++++++++++++++----- crates/exex/exex/src/wal/storage.rs | 59 ++++++++++++++++++++--------- 5 files changed, 143 insertions(+), 35 deletions(-) create mode 100644 crates/exex/exex/src/wal/metrics.rs diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 126622c26ac7..b99d5e57dab5 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -162,7 +162,7 @@ impl ExExHandle { /// Metrics for the `ExEx` manager. #[derive(Metrics)] -#[metrics(scope = "exex_manager")] +#[metrics(scope = "exex.manager")] pub struct ExExManagerMetrics { /// Max size of the internal state notifications buffer. max_capacity: Gauge, diff --git a/crates/exex/exex/src/wal/cache.rs b/crates/exex/exex/src/wal/cache.rs index 1a8b914e868f..54750e7047af 100644 --- a/crates/exex/exex/src/wal/cache.rs +++ b/crates/exex/exex/src/wal/cache.rs @@ -14,18 +14,25 @@ use reth_exex_types::ExExNotification; #[derive(Debug, Default)] pub struct BlockCache { /// A min heap of `(Block Number, File ID)` tuples. - pub(super) blocks: BinaryHeap>, + /// + /// Contains one highest block in notification. In a notification with both committed and + /// reverted chain, the highest block is chosen between both chains. + pub(super) notification_max_blocks: BinaryHeap>, /// A mapping of committed blocks `Block Hash -> Block`. /// /// For each [`ExExNotification::ChainCommitted`] notification, there will be an entry per /// block. pub(super) committed_blocks: FbHashMap<32, (u32, CachedBlock)>, + /// Block height of the lowest committed block currently in the cache. + pub(super) lowest_committed_block_height: Option, + /// Block height of the highest committed block currently in the cache. + pub(super) highest_committed_block_height: Option, } impl BlockCache { /// Returns `true` if the cache is empty. pub(super) fn is_empty(&self) -> bool { - self.blocks.is_empty() + self.notification_max_blocks.is_empty() } /// Removes all files from the cache that has notifications with a tip block less than or equal @@ -37,9 +44,11 @@ impl BlockCache { pub(super) fn remove_before(&mut self, block_number: BlockNumber) -> HashSet { let mut file_ids = HashSet::default(); - while let Some(block @ Reverse((max_block, file_id))) = self.blocks.peek().copied() { + while let Some(block @ Reverse((max_block, file_id))) = + self.notification_max_blocks.peek().copied() + { if max_block <= block_number { - let popped_block = self.blocks.pop().unwrap(); + let popped_block = self.notification_max_blocks.pop().unwrap(); debug_assert_eq!(popped_block, block); file_ids.insert(file_id); } else { @@ -47,7 +56,21 @@ impl BlockCache { } } - self.committed_blocks.retain(|_, (file_id, _)| !file_ids.contains(file_id)); + let (mut lowest_committed_block_height, mut highest_committed_block_height) = (None, None); + self.committed_blocks.retain(|_, (file_id, block)| { + lowest_committed_block_height = Some( + lowest_committed_block_height + .map_or(block.block.number, |lowest| block.block.number.min(lowest)), + ); + highest_committed_block_height = Some( + highest_committed_block_height + .map_or(block.block.number, |highest| block.block.number.max(highest)), + ); + + !file_ids.contains(file_id) + }); + self.lowest_committed_block_height = lowest_committed_block_height; + self.highest_committed_block_height = highest_committed_block_height; file_ids } @@ -70,7 +93,7 @@ impl BlockCache { let max_block = reverted_chain.iter().chain(&committed_chain).map(|chain| chain.tip().number).max(); if let Some(max_block) = max_block { - self.blocks.push(Reverse((max_block, file_id))); + self.notification_max_blocks.push(Reverse((max_block, file_id))); } if let Some(committed_chain) = &committed_chain { @@ -81,12 +104,19 @@ impl BlockCache { }; self.committed_blocks.insert(block.hash(), (file_id, cached_block)); } + + self.highest_committed_block_height = Some(committed_chain.tip().number); } } #[cfg(test)] pub(super) fn blocks_sorted(&self) -> Vec<(BlockNumber, u32)> { - self.blocks.clone().into_sorted_vec().into_iter().map(|entry| entry.0).collect() + self.notification_max_blocks + .clone() + .into_sorted_vec() + .into_iter() + .map(|entry| entry.0) + .collect() } #[cfg(test)] diff --git a/crates/exex/exex/src/wal/metrics.rs b/crates/exex/exex/src/wal/metrics.rs new file mode 100644 index 000000000000..7726fc978d47 --- /dev/null +++ b/crates/exex/exex/src/wal/metrics.rs @@ -0,0 +1,18 @@ +use metrics::Gauge; +use reth_metrics::Metrics; + +/// Metrics for the [WAL](`super::Wal`) +#[derive(Metrics)] +#[metrics(scope = "exex.wal")] +pub(super) struct Metrics { + /// Size of all notifications in WAL in bytes + pub size_bytes: Gauge, + /// Total number of notifications in WAL + pub notifications_total: Gauge, + /// Total number of committed blocks in WAL + pub committed_blocks_total: Gauge, + /// Lowest committed block height in WAL + pub lowest_committed_block_height: Gauge, + /// Highest committed block height in WAL + pub highest_committed_block_height: Gauge, +} diff --git a/crates/exex/exex/src/wal/mod.rs b/crates/exex/exex/src/wal/mod.rs index 4515029dcb3f..e8c9c6bc805e 100644 --- a/crates/exex/exex/src/wal/mod.rs +++ b/crates/exex/exex/src/wal/mod.rs @@ -3,8 +3,9 @@ mod cache; pub use cache::BlockCache; mod storage; -use parking_lot::{RwLock, RwLockReadGuard}; pub use storage::Storage; +mod metrics; +use metrics::Metrics; use std::{ path::Path, @@ -16,6 +17,7 @@ use std::{ use alloy_eips::BlockNumHash; use alloy_primitives::B256; +use parking_lot::{RwLock, RwLockReadGuard}; use reth_exex_types::ExExNotification; use reth_tracing::tracing::{debug, instrument}; @@ -74,6 +76,7 @@ struct WalInner { storage: Storage, /// WAL block cache. See [`cache::BlockCache`] docs for more details. block_cache: RwLock, + metrics: Metrics, } impl WalInner { @@ -82,6 +85,7 @@ impl WalInner { next_file_id: AtomicU32::new(0), storage: Storage::new(directory)?, block_cache: RwLock::new(BlockCache::default()), + metrics: Metrics::default(), }; wal.fill_block_cache()?; Ok(wal) @@ -98,9 +102,12 @@ impl WalInner { self.next_file_id.store(files_range.end() + 1, Ordering::Relaxed); let mut block_cache = self.block_cache.write(); + let mut notifications_size = 0; for entry in self.storage.iter_notifications(files_range) { - let (file_id, notification) = entry?; + let (file_id, size, notification) = entry?; + + notifications_size += size; let committed_chain = notification.committed_chain(); let reverted_chain = notification.reverted_chain(); @@ -116,6 +123,8 @@ impl WalInner { block_cache.insert_notification_blocks_with_file_id(file_id, ¬ification); } + self.update_metrics(&block_cache, notifications_size as i64); + Ok(()) } @@ -127,17 +136,20 @@ impl WalInner { let mut block_cache = self.block_cache.write(); let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed); - self.storage.write_notification(file_id, notification)?; + let size = self.storage.write_notification(file_id, notification)?; debug!(?file_id, "Inserting notification blocks into the block cache"); block_cache.insert_notification_blocks_with_file_id(file_id, notification); + self.update_metrics(&block_cache, size as i64); + Ok(()) } #[instrument(target = "exex::wal", skip(self))] fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> { - let file_ids = self.block_cache.write().remove_before(to_block.number); + let mut block_cache = self.block_cache.write(); + let file_ids = block_cache.remove_before(to_block.number); // Remove notifications from the storage. if file_ids.is_empty() { @@ -145,12 +157,34 @@ impl WalInner { return Ok(()) } - let removed_notifications = self.storage.remove_notifications(file_ids)?; - debug!(?removed_notifications, "Storage was finalized"); + let (removed_notifications, removed_size) = self.storage.remove_notifications(file_ids)?; + debug!(?removed_notifications, ?removed_size, "Storage was finalized"); + + self.update_metrics(&block_cache, -(removed_size as i64)); Ok(()) } + fn update_metrics(&self, block_cache: &BlockCache, size_delta: i64) { + if size_delta >= 0 { + self.metrics.size_bytes.increment(size_delta as f64); + } else { + self.metrics.size_bytes.decrement(size_delta as f64); + } + + self.metrics.notifications_total.set(block_cache.notification_max_blocks.len() as f64); + + self.metrics.committed_blocks_total.set(block_cache.committed_blocks.len() as f64); + + if let Some(lowest_committed_block_height) = block_cache.lowest_committed_block_height { + self.metrics.lowest_committed_block_height.set(lowest_committed_block_height as f64); + } + + if let Some(highest_committed_block_height) = block_cache.highest_committed_block_height { + self.metrics.highest_committed_block_height.set(highest_committed_block_height as f64); + } + } + /// Returns an iterator over all notifications in the WAL. fn iter_notifications( &self, @@ -159,7 +193,7 @@ impl WalInner { return Ok(Box::new(std::iter::empty())) }; - Ok(Box::new(self.storage.iter_notifications(range).map(|entry| Ok(entry?.1)))) + Ok(Box::new(self.storage.iter_notifications(range).map(|entry| Ok(entry?.2)))) } } @@ -180,7 +214,10 @@ impl WalHandle { return Ok(None) }; - self.wal.storage.read_notification(file_id) + self.wal + .storage + .read_notification(file_id) + .map(|entry| entry.map(|(notification, _)| notification)) } } @@ -205,7 +242,7 @@ mod tests { wal.inner .storage .iter_notifications(files_range) - .map(|entry| Ok(entry?.1)) + .map(|entry| Ok(entry?.2)) .collect::>() } diff --git a/crates/exex/exex/src/wal/storage.rs b/crates/exex/exex/src/wal/storage.rs index e921bdac862b..166a9bb4eb6b 100644 --- a/crates/exex/exex/src/wal/storage.rs +++ b/crates/exex/exex/src/wal/storage.rs @@ -40,16 +40,23 @@ impl Storage { } /// Removes notification for the given file ID from the storage. + /// + /// # Returns + /// + /// The size of the file that was removed in bytes, if any. #[instrument(target = "exex::wal::storage", skip(self))] - fn remove_notification(&self, file_id: u32) -> bool { + fn remove_notification(&self, file_id: u32) -> Option { + let path = self.file_path(file_id); + let size = path.metadata().ok()?.len(); + match reth_fs_util::remove_file(self.file_path(file_id)) { Ok(()) => { debug!("Notification was removed from the storage"); - true + Some(size) } Err(err) => { debug!(?err, "Failed to remove notification from the storage"); - false + None } } } @@ -77,36 +84,42 @@ impl Storage { /// /// # Returns /// - /// Number of removed notifications. + /// Number of removed notifications and the total size of the removed files in bytes. pub(super) fn remove_notifications( &self, file_ids: impl IntoIterator, - ) -> eyre::Result { - let mut deleted = 0; + ) -> eyre::Result<(usize, u64)> { + let mut deleted_total = 0; + let mut deleted_size = 0; for id in file_ids { - if self.remove_notification(id) { - deleted += 1; + if let Some(size) = self.remove_notification(id) { + deleted_total += 1; + deleted_size += size; } } - Ok(deleted) + Ok((deleted_total, deleted_size)) } pub(super) fn iter_notifications( &self, range: RangeInclusive, - ) -> impl Iterator> + '_ { + ) -> impl Iterator> + '_ { range.map(move |id| { - let notification = self.read_notification(id)?.ok_or_eyre("notification not found")?; + let (notification, size) = + self.read_notification(id)?.ok_or_eyre("notification not found")?; - Ok((id, notification)) + Ok((id, size, notification)) }) } /// Reads the notification from the file with the given ID. #[instrument(target = "exex::wal::storage", skip(self))] - pub(super) fn read_notification(&self, file_id: u32) -> eyre::Result> { + pub(super) fn read_notification( + &self, + file_id: u32, + ) -> eyre::Result> { let file_path = self.file_path(file_id); debug!(?file_path, "Reading notification from WAL"); @@ -115,21 +128,26 @@ impl Storage { Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None), Err(err) => return Err(err.into()), }; + let size = file.metadata()?.len(); // Deserialize using the bincode- and msgpack-compatible serde wrapper let notification: reth_exex_types::serde_bincode_compat::ExExNotification<'_> = rmp_serde::decode::from_read(&mut file)?; - Ok(Some(notification.into())) + Ok(Some((notification.into(), size))) } /// Writes the notification to the file with the given ID. + /// + /// # Returns + /// + /// The size of the file that was written in bytes. #[instrument(target = "exex::wal::storage", skip(self, notification))] pub(super) fn write_notification( &self, file_id: u32, notification: &ExExNotification, - ) -> eyre::Result<()> { + ) -> eyre::Result { let file_path = self.file_path(file_id); debug!(?file_path, "Writing notification to WAL"); @@ -137,9 +155,11 @@ impl Storage { let notification = reth_exex_types::serde_bincode_compat::ExExNotification::from(notification); - Ok(reth_fs_util::atomic_write_file(&file_path, |file| { + reth_fs_util::atomic_write_file(&file_path, |file| { rmp_serde::encode::write(file, ¬ification) - })?) + })?; + + Ok(file_path.metadata()?.len()) } } @@ -177,7 +197,10 @@ mod tests { let file_id = 0; storage.write_notification(file_id, ¬ification)?; let deserialized_notification = storage.read_notification(file_id)?; - assert_eq!(deserialized_notification, Some(notification)); + assert_eq!( + deserialized_notification.map(|(notification, _)| notification), + Some(notification) + ); Ok(()) }