From eb00e6d7898059c40b81a1005011b25c9f81a597 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Tue, 31 Oct 2023 11:35:11 +0000 Subject: [PATCH 01/25] add snapshots_path to ChainPath --- bin/reth/src/dirs.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/bin/reth/src/dirs.rs b/bin/reth/src/dirs.rs index c93e8234ba4d..0bb05125717c 100644 --- a/bin/reth/src/dirs.rs +++ b/bin/reth/src/dirs.rs @@ -264,6 +264,11 @@ impl ChainPath { self.0.join("db").into() } + /// Returns the path to the snapshots directory for this chain. + pub fn snapshots_path(&self) -> PathBuf { + self.0.join("snapshots").into() + } + /// Returns the path to the reth p2p secret key for this chain. pub fn p2p_secret_path(&self) -> PathBuf { self.0.join("discovery-secret").into() From a8ef221e63ee8141a20d4b04ddc91c348df5224a Mon Sep 17 00:00:00 2001 From: joshieDo Date: Tue, 31 Oct 2023 11:36:02 +0000 Subject: [PATCH 02/25] move HighestSnapshots to primitives crate --- crates/primitives/src/snapshot/mod.rs | 15 +++++++++++++++ crates/snapshot/src/lib.rs | 3 +-- crates/snapshot/src/snapshotter.rs | 20 +++----------------- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/crates/primitives/src/snapshot/mod.rs b/crates/primitives/src/snapshot/mod.rs index d8fc8db53624..28c21432d1b3 100644 --- a/crates/primitives/src/snapshot/mod.rs +++ b/crates/primitives/src/snapshot/mod.rs @@ -4,9 +4,24 @@ mod compression; mod filters; mod segment; +use alloy_primitives::BlockNumber; pub use compression::Compression; pub use filters::{Filters, InclusionFilter, PerfectHashingFunction}; pub use segment::{SegmentHeader, SnapshotSegment}; /// Default snapshot block count. pub const BLOCKS_PER_SNAPSHOT: u64 = 500_000; + +/// Highest snapshotted block numbers, per data part. +#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)] +pub struct HighestSnapshots { + /// Highest snapshotted block of headers, inclusive. + /// If [`None`], no snapshot is available. + pub headers: Option, + /// Highest snapshotted block of receipts, inclusive. + /// If [`None`], no snapshot is available. + pub receipts: Option, + /// Highest snapshotted block of transactions, inclusive. + /// If [`None`], no snapshot is available. + pub transactions: Option, +} diff --git a/crates/snapshot/src/lib.rs b/crates/snapshot/src/lib.rs index 18b22bdb54a6..82f42b2d4c05 100644 --- a/crates/snapshot/src/lib.rs +++ b/crates/snapshot/src/lib.rs @@ -15,6 +15,5 @@ mod snapshotter; pub use error::SnapshotterError; pub use snapshotter::{ - HighestSnapshots, HighestSnapshotsTracker, SnapshotTargets, Snapshotter, SnapshotterResult, - SnapshotterWithResult, + HighestSnapshotsTracker, SnapshotTargets, Snapshotter, SnapshotterResult, SnapshotterWithResult, }; diff --git a/crates/snapshot/src/snapshotter.rs b/crates/snapshot/src/snapshotter.rs index b850f68a1c74..51c9940120b8 100644 --- a/crates/snapshot/src/snapshotter.rs +++ b/crates/snapshot/src/snapshotter.rs @@ -3,7 +3,7 @@ use crate::SnapshotterError; use reth_db::database::Database; use reth_interfaces::{RethError, RethResult}; -use reth_primitives::{BlockNumber, ChainSpec, TxNumber}; +use reth_primitives::{snapshot::HighestSnapshots, BlockNumber, ChainSpec, TxNumber}; use reth_provider::{BlockReader, DatabaseProviderRO, ProviderFactory}; use std::{collections::HashMap, ops::RangeInclusive, sync::Arc}; use tokio::sync::watch; @@ -28,20 +28,6 @@ pub struct Snapshotter { /// Tracker for the latest [`HighestSnapshots`] value. pub type HighestSnapshotsTracker = watch::Receiver>; -/// Highest snapshotted block numbers, per data part. -#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)] -pub struct HighestSnapshots { - /// Highest snapshotted block of headers, inclusive. - /// If [`None`], no snapshot is available. - pub headers: Option, - /// Highest snapshotted block of receipts, inclusive. - /// If [`None`], no snapshot is available. - pub receipts: Option, - /// Highest snapshotted block of transactions, inclusive. - /// If [`None`], no snapshot is available. - pub transactions: Option, -} - /// Snapshot targets, per data part, measured in [`BlockNumber`] and [`TxNumber`], if applicable. #[derive(Debug, Clone, Eq, PartialEq)] pub struct SnapshotTargets { @@ -240,13 +226,13 @@ impl Snapshotter { #[cfg(test)] mod tests { - use crate::{snapshotter::SnapshotTargets, HighestSnapshots, Snapshotter}; + use crate::{snapshotter::SnapshotTargets, Snapshotter}; use assert_matches::assert_matches; use reth_interfaces::{ test_utils::{generators, generators::random_block_range}, RethError, }; - use reth_primitives::{B256, MAINNET}; + use reth_primitives::{snapshot::HighestSnapshots, B256, MAINNET}; use reth_stages::test_utils::TestTransaction; use tokio::sync::watch; From f1f32d77f878c9a1303171a34d223b5268bf98ff Mon Sep 17 00:00:00 2001 From: joshieDo Date: Tue, 31 Oct 2023 11:36:30 +0000 Subject: [PATCH 03/25] add with_highest_tracker to SnapshotProvider --- .../provider/src/providers/snapshot/manager.rs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/crates/storage/provider/src/providers/snapshot/manager.rs b/crates/storage/provider/src/providers/snapshot/manager.rs index a065f6d1db48..59d93af44e46 100644 --- a/crates/storage/provider/src/providers/snapshot/manager.rs +++ b/crates/storage/provider/src/providers/snapshot/manager.rs @@ -4,11 +4,13 @@ use dashmap::DashMap; use reth_interfaces::RethResult; use reth_nippy_jar::NippyJar; use reth_primitives::{ - snapshot::BLOCKS_PER_SNAPSHOT, Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, - Header, SealedHeader, SnapshotSegment, TransactionMeta, TransactionSigned, - TransactionSignedNoHash, TxHash, TxNumber, B256, U256, + snapshot::{HighestSnapshots, BLOCKS_PER_SNAPSHOT}, + Address, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, Header, SealedHeader, + SnapshotSegment, TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber, + B256, U256, }; use std::{ops::RangeBounds, path::PathBuf}; +use tokio::sync::watch; /// SnapshotProvider #[derive(Debug, Default)] @@ -16,9 +18,19 @@ pub struct SnapshotProvider { /// Maintains a map which allows for concurrent access to different `NippyJars`, over different /// segments and ranges. map: DashMap<(BlockNumber, SnapshotSegment), LoadedJar>, + highest_tracker: Option>>, } impl SnapshotProvider { + /// Adds a highest snapshot tracker to the provider + pub fn with_highest_tracker( + mut self, + highest_tracker: Option>>, + ) -> Self { + self.highest_tracker = highest_tracker; + self + } + /// Gets the provider of the requested segment and range. pub fn get_segment_provider( &self, From f5cadf89ecf02d47166b15fe390124b6efa529ba Mon Sep 17 00:00:00 2001 From: joshieDo Date: Tue, 31 Oct 2023 11:37:05 +0000 Subject: [PATCH 04/25] add a shared snapshot provvider to providerfactory and dbprovider --- crates/blockchain-tree/src/blockchain_tree.rs | 2 + .../provider/src/providers/database/mod.rs | 41 ++++++++++++++++--- .../src/providers/database/provider.rs | 19 +++++++-- 3 files changed, 53 insertions(+), 9 deletions(-) diff --git a/crates/blockchain-tree/src/blockchain_tree.rs b/crates/blockchain-tree/src/blockchain_tree.rs index bd1d72c971e4..4085f649b8e9 100644 --- a/crates/blockchain-tree/src/blockchain_tree.rs +++ b/crates/blockchain-tree/src/blockchain_tree.rs @@ -1071,6 +1071,7 @@ impl BlockchainTree { let provider = DatabaseProvider::new_rw( self.externals.db.tx_mut()?, self.externals.chain_spec.clone(), + None, ); let (blocks, state) = chain.into_inner(); @@ -1116,6 +1117,7 @@ impl BlockchainTree { let provider = DatabaseProvider::new_rw( self.externals.db.tx_mut()?, self.externals.chain_spec.clone(), + None, ); let tip = provider.last_block_number()?; diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index 140de2a2ff43..71fe7e64073a 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -1,5 +1,8 @@ use crate::{ - providers::state::{historical::HistoricalStateProvider, latest::LatestStateProvider}, + providers::{ + state::{historical::HistoricalStateProvider, latest::LatestStateProvider}, + SnapshotProvider, + }, traits::{BlockSource, ReceiptProvider}, BlockHashReader, BlockNumReader, BlockReader, ChainSpecProvider, EvmEnvProvider, HeaderProvider, ProviderError, PruneCheckpointReader, StageCheckpointReader, StateProviderBox, @@ -8,6 +11,7 @@ use crate::{ use reth_db::{database::Database, init_db, models::StoredBlockBodyIndices, DatabaseEnv}; use reth_interfaces::{db::LogLevel, RethError, RethResult}; use reth_primitives::{ + snapshot::HighestSnapshots, stage::{StageCheckpoint, StageId}, Address, Block, BlockHash, BlockHashOrNumber, BlockNumber, BlockWithSenders, ChainInfo, ChainSpec, Header, PruneCheckpoint, PruneSegment, Receipt, SealedBlock, SealedHeader, @@ -19,6 +23,7 @@ use std::{ ops::{RangeBounds, RangeInclusive}, sync::Arc, }; +use tokio::sync::watch; use tracing::trace; mod provider; @@ -33,6 +38,8 @@ pub struct ProviderFactory { db: DB, /// Chain spec chain_spec: Arc, + /// Snapshot Provider + snapshot_provider: Option>, } impl ProviderFactory { @@ -40,7 +47,11 @@ impl ProviderFactory { /// database using different types of providers. Example: [`HeaderProvider`] /// [`BlockHashReader`]. This may fail if the inner read database transaction fails to open. pub fn provider(&self) -> RethResult> { - Ok(DatabaseProvider::new(self.db.tx()?, self.chain_spec.clone())) + Ok(DatabaseProvider::new( + self.db.tx()?, + self.chain_spec.clone(), + self.snapshot_provider.clone(), + )) } /// Returns a provider with a created `DbTxMut` inside, which allows fetching and updating @@ -48,14 +59,29 @@ impl ProviderFactory { /// [`BlockHashReader`]. This may fail if the inner read/write database transaction fails to /// open. pub fn provider_rw(&self) -> RethResult> { - Ok(DatabaseProviderRW(DatabaseProvider::new_rw(self.db.tx_mut()?, self.chain_spec.clone()))) + Ok(DatabaseProviderRW(DatabaseProvider::new_rw( + self.db.tx_mut()?, + self.chain_spec.clone(), + self.snapshot_provider.clone(), + ))) } } impl ProviderFactory { /// create new database provider pub fn new(db: DB, chain_spec: Arc) -> Self { - Self { db, chain_spec } + Self { db, chain_spec, snapshot_provider: None } + } + + /// database provider comes with a shared snapshot provider + pub fn with_snapshots( + mut self, + highest_snapshot_tracker: watch::Receiver>, + ) -> Self { + self.snapshot_provider = Some(Arc::new( + SnapshotProvider::default().with_highest_tracker(Some(highest_snapshot_tracker)), + )); + self } } @@ -70,13 +96,18 @@ impl ProviderFactory { Ok(ProviderFactory:: { db: init_db(path, log_level).map_err(|e| RethError::Custom(e.to_string()))?, chain_spec, + snapshot_provider: None, }) } } impl Clone for ProviderFactory { fn clone(&self) -> Self { - Self { db: self.db.clone(), chain_spec: Arc::clone(&self.chain_spec) } + Self { + db: self.db.clone(), + chain_spec: Arc::clone(&self.chain_spec), + snapshot_provider: self.snapshot_provider.clone(), + } } } diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index f92a37c4be1c..4a946455466b 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -1,5 +1,6 @@ use crate::{ bundle_state::{BundleStateInit, BundleStateWithReceipts, RevertsInit}, + providers::SnapshotProvider, traits::{ AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter, }, @@ -96,12 +97,18 @@ pub struct DatabaseProvider { tx: TX, /// Chain spec chain_spec: Arc, + /// Snapshot provider + snapshot_provider: Option>, } impl DatabaseProvider { /// Creates a provider with an inner read-write transaction. - pub fn new_rw(tx: TX, chain_spec: Arc) -> Self { - Self { tx, chain_spec } + pub fn new_rw( + tx: TX, + chain_spec: Arc, + snapshot_provider: Option>, + ) -> Self { + Self { tx, chain_spec, snapshot_provider } } } @@ -155,8 +162,12 @@ where impl DatabaseProvider { /// Creates a provider with an inner read-only transaction. - pub fn new(tx: TX, chain_spec: Arc) -> Self { - Self { tx, chain_spec } + pub fn new( + tx: TX, + chain_spec: Arc, + snapshot_provider: Option>, + ) -> Self { + Self { tx, chain_spec, snapshot_provider } } /// Consume `DbTx` or `DbTxMut`. From 1acd28f2c8e3b88ffeded04b27f9774a037032fa Mon Sep 17 00:00:00 2001 From: joshieDo Date: Tue, 31 Oct 2023 11:45:10 +0000 Subject: [PATCH 05/25] add snapshot provider to shared blockchain_db --- bin/reth/src/node/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 5aa011bce06a..9061448d338d 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -295,7 +295,9 @@ impl NodeCommand { let head = self.lookup_head(Arc::clone(&db)).wrap_err("the head block is missing")?; // setup the blockchain provider - let factory = ProviderFactory::new(Arc::clone(&db), Arc::clone(&self.chain)); + let (highest_snapshots_tx, highest_snapshots_rx) = watch::channel(None); + let factory = ProviderFactory::new(Arc::clone(&db), Arc::clone(&self.chain)) + .with_snapshots(highest_snapshots_rx.clone()); let blockchain_db = BlockchainProvider::new(factory, blockchain_tree.clone())?; let blob_store = InMemoryBlobStore::default(); let validator = TransactionValidationTaskExecutor::eth_builder(Arc::clone(&self.chain)) @@ -450,8 +452,6 @@ impl NodeCommand { None }; - let (highest_snapshots_tx, highest_snapshots_rx) = watch::channel(None); - let mut hooks = EngineHooks::new(); let pruner_events = if let Some(prune_config) = prune_config { From 470a6b6a651b7e210e8836e47963ea2f18b2475b Mon Sep 17 00:00:00 2001 From: joshieDo Date: Tue, 31 Oct 2023 11:48:10 +0000 Subject: [PATCH 06/25] allow unused for now --- crates/storage/provider/src/providers/database/provider.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 4a946455466b..fe63caac61d6 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -98,6 +98,7 @@ pub struct DatabaseProvider { /// Chain spec chain_spec: Arc, /// Snapshot provider + #[allow(unused)] snapshot_provider: Option>, } From 113ef23de1c09c008ced21718c514369d5e0a17a Mon Sep 17 00:00:00 2001 From: joshieDo Date: Tue, 31 Oct 2023 12:40:52 +0000 Subject: [PATCH 07/25] add get_highest_snapshot to SnapshotProvider --- crates/primitives/src/snapshot/mod.rs | 11 +++++++++++ .../provider/src/providers/snapshot/manager.rs | 8 ++++++++ 2 files changed, 19 insertions(+) diff --git a/crates/primitives/src/snapshot/mod.rs b/crates/primitives/src/snapshot/mod.rs index 28c21432d1b3..f73ba9874d77 100644 --- a/crates/primitives/src/snapshot/mod.rs +++ b/crates/primitives/src/snapshot/mod.rs @@ -25,3 +25,14 @@ pub struct HighestSnapshots { /// If [`None`], no snapshot is available. pub transactions: Option, } + +impl HighestSnapshots { + /// Returns the highest snapshot if it exists for a segment + pub fn highest(&self, segment: SnapshotSegment) -> Option { + match segment { + SnapshotSegment::Headers => self.headers, + SnapshotSegment::Transactions => self.transactions, + SnapshotSegment::Receipts => self.receipts, + } + } +} diff --git a/crates/storage/provider/src/providers/snapshot/manager.rs b/crates/storage/provider/src/providers/snapshot/manager.rs index 59d93af44e46..946e8349a7cb 100644 --- a/crates/storage/provider/src/providers/snapshot/manager.rs +++ b/crates/storage/provider/src/providers/snapshot/manager.rs @@ -18,6 +18,7 @@ pub struct SnapshotProvider { /// Maintains a map which allows for concurrent access to different `NippyJars`, over different /// segments and ranges. map: DashMap<(BlockNumber, SnapshotSegment), LoadedJar>, + /// Tracks the latest and highest snapshot of every segment. highest_tracker: Option>>, } @@ -56,6 +57,13 @@ impl SnapshotProvider { self.get_segment_provider(segment, block, path) } + + /// Gets the highest snapshot if it exists for a snapshot segment. + pub fn get_highest_snapshot(&self, segment: SnapshotSegment) -> Option { + self.highest_tracker + .as_ref() + .and_then(|tracker| tracker.borrow().and_then(|highest| highest.highest(segment))) + } } impl HeaderProvider for SnapshotProvider { From 73fd00fc40dafd597e0e44bcc0d2fc12f2c4d69d Mon Sep 17 00:00:00 2001 From: joshieDo Date: Thu, 2 Nov 2023 16:29:11 +0000 Subject: [PATCH 08/25] move hihgest snapshot channel inside snapshotter --- bin/reth/src/node/mod.rs | 23 +++++++++++++---------- crates/snapshot/src/snapshotter.rs | 27 ++++++++++++++------------- 2 files changed, 27 insertions(+), 23 deletions(-) diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 9061448d338d..da4fe6a4ea76 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -294,10 +294,16 @@ impl NodeCommand { // fetch the head block from the database let head = self.lookup_head(Arc::clone(&db)).wrap_err("the head block is missing")?; + // configure snapshotter + let snapshotter = reth_snapshot::Snapshotter::new( + db.clone(), + self.chain.clone(), + self.chain.snapshot_block_interval, + ); + // setup the blockchain provider - let (highest_snapshots_tx, highest_snapshots_rx) = watch::channel(None); let factory = ProviderFactory::new(Arc::clone(&db), Arc::clone(&self.chain)) - .with_snapshots(highest_snapshots_rx.clone()); + .with_snapshots(snapshotter.highest_snapshot_receiver()); let blockchain_db = BlockchainProvider::new(factory, blockchain_tree.clone())?; let blob_store = InMemoryBlobStore::default(); let validator = TransactionValidationTaskExecutor::eth_builder(Arc::clone(&self.chain)) @@ -455,7 +461,11 @@ impl NodeCommand { let mut hooks = EngineHooks::new(); let pruner_events = if let Some(prune_config) = prune_config { - let mut pruner = self.build_pruner(&prune_config, db.clone(), highest_snapshots_rx); + let mut pruner = self.build_pruner( + &prune_config, + db.clone(), + snapshotter.highest_snapshot_receiver(), + ); let events = pruner.events(); hooks.add(PruneHook::new(pruner, Box::new(ctx.task_executor.clone()))); @@ -466,13 +476,6 @@ impl NodeCommand { Either::Right(stream::empty()) }; - let _snapshotter = reth_snapshot::Snapshotter::new( - db, - self.chain.clone(), - self.chain.snapshot_block_interval, - highest_snapshots_tx, - ); - // Configure the consensus engine let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel( client, diff --git a/crates/snapshot/src/snapshotter.rs b/crates/snapshot/src/snapshotter.rs index 51c9940120b8..d47f4f69f902 100644 --- a/crates/snapshot/src/snapshotter.rs +++ b/crates/snapshot/src/snapshotter.rs @@ -74,12 +74,9 @@ impl SnapshotTargets { impl Snapshotter { /// Creates a new [Snapshotter]. - pub fn new( - db: DB, - chain_spec: Arc, - block_interval: u64, - highest_snapshots_tracker: watch::Sender>, - ) -> Self { + pub fn new(db: DB, chain_spec: Arc, block_interval: u64) -> Self { + let (highest_snapshots_tracker, _) = watch::channel(None); + let snapshotter = Self { provider_factory: ProviderFactory::new(db, chain_spec), // TODO(alexey): fill from on-disk snapshot data @@ -112,6 +109,11 @@ impl Snapshotter { }); } + /// Returns a new [`HighestSnapshotsTracker`]. + pub fn highest_snapshot_receiver(&self) -> HighestSnapshotsTracker { + self.highest_snapshots_tracker.subscribe() + } + /// Run the snapshotter pub fn run(&mut self, targets: SnapshotTargets) -> SnapshotterResult { debug_assert!(targets.is_multiple_of_block_interval(self.block_interval)); @@ -234,17 +236,17 @@ mod tests { }; use reth_primitives::{snapshot::HighestSnapshots, B256, MAINNET}; use reth_stages::test_utils::TestTransaction; - use tokio::sync::watch; #[test] fn new() { let tx = TestTransaction::default(); - let (highest_snapshots_tx, highest_snapshots_rx) = watch::channel(None); - assert_eq!(*highest_snapshots_rx.borrow(), None); + let snapshotter = Snapshotter::new(tx.inner_raw(), MAINNET.clone(), 2); - Snapshotter::new(tx.inner_raw(), MAINNET.clone(), 2, highest_snapshots_tx); - assert_eq!(*highest_snapshots_rx.borrow(), Some(HighestSnapshots::default())); + assert_eq!( + *snapshotter.highest_snapshot_receiver().borrow(), + Some(HighestSnapshots::default()) + ); } #[test] @@ -255,8 +257,7 @@ mod tests { let blocks = random_block_range(&mut rng, 0..=3, B256::ZERO, 2..3); tx.insert_blocks(blocks.iter(), None).expect("insert blocks"); - let mut snapshotter = - Snapshotter::new(tx.inner_raw(), MAINNET.clone(), 2, watch::channel(None).0); + let mut snapshotter = Snapshotter::new(tx.inner_raw(), MAINNET.clone(), 2); // Snapshot targets has data per part up to the passed finalized block number, // respecting the block interval From 2fef51f286f9c660d74e92f8a96bc39fe91b6164 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Thu, 2 Nov 2023 16:53:04 +0000 Subject: [PATCH 09/25] add default receiver to snapshotter --- crates/snapshot/src/snapshotter.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/crates/snapshot/src/snapshotter.rs b/crates/snapshot/src/snapshotter.rs index d47f4f69f902..3ba37d5f1316 100644 --- a/crates/snapshot/src/snapshotter.rs +++ b/crates/snapshot/src/snapshotter.rs @@ -18,9 +18,14 @@ pub type SnapshotterWithResult = (Snapshotter, SnapshotterResult); /// Snapshotting routine. Main snapshotting logic happens in [Snapshotter::run]. #[derive(Debug)] pub struct Snapshotter { + /// Provider factory provider_factory: ProviderFactory, + /// Highest snapshot block number for each highest_snapshots: HighestSnapshots, - highest_snapshots_tracker: watch::Sender>, + /// Channel sender to notify other components of the new highest snapshot values + highest_snapshots_notifier: watch::Sender>, + /// Channel receiver to be cloned and shared that already comes with the newest value + highest_snapshots_tracker: HighestSnapshotsTracker, /// Block interval after which the snapshot is taken. block_interval: u64, } @@ -75,12 +80,13 @@ impl SnapshotTargets { impl Snapshotter { /// Creates a new [Snapshotter]. pub fn new(db: DB, chain_spec: Arc, block_interval: u64) -> Self { - let (highest_snapshots_tracker, _) = watch::channel(None); + let (highest_snapshots_notifier, highest_snapshots_tracker) = watch::channel(None); let snapshotter = Self { provider_factory: ProviderFactory::new(db, chain_spec), // TODO(alexey): fill from on-disk snapshot data highest_snapshots: HighestSnapshots::default(), + highest_snapshots_notifier, highest_snapshots_tracker, block_interval, }; @@ -104,14 +110,14 @@ impl Snapshotter { } fn update_highest_snapshots_tracker(&self) { - let _ = self.highest_snapshots_tracker.send(Some(self.highest_snapshots)).map_err(|_| { + let _ = self.highest_snapshots_notifier.send(Some(self.highest_snapshots)).map_err(|_| { warn!(target: "snapshot", "Highest snapshots channel closed"); }); } /// Returns a new [`HighestSnapshotsTracker`]. pub fn highest_snapshot_receiver(&self) -> HighestSnapshotsTracker { - self.highest_snapshots_tracker.subscribe() + self.highest_snapshots_tracker.clone() } /// Run the snapshotter From 33dc284d7f263bd8ce0a7021c48fa71cbd09e2c6 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Thu, 2 Nov 2023 17:07:23 +0000 Subject: [PATCH 10/25] replace with with_snapshot_provider on db provider --- crates/blockchain-tree/src/blockchain_tree.rs | 2 -- .../provider/src/providers/database/mod.rs | 24 +++++++++++-------- .../src/providers/database/provider.rs | 22 ++++++++--------- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/crates/blockchain-tree/src/blockchain_tree.rs b/crates/blockchain-tree/src/blockchain_tree.rs index 4085f649b8e9..bd1d72c971e4 100644 --- a/crates/blockchain-tree/src/blockchain_tree.rs +++ b/crates/blockchain-tree/src/blockchain_tree.rs @@ -1071,7 +1071,6 @@ impl BlockchainTree { let provider = DatabaseProvider::new_rw( self.externals.db.tx_mut()?, self.externals.chain_spec.clone(), - None, ); let (blocks, state) = chain.into_inner(); @@ -1117,7 +1116,6 @@ impl BlockchainTree { let provider = DatabaseProvider::new_rw( self.externals.db.tx_mut()?, self.externals.chain_spec.clone(), - None, ); let tip = provider.last_block_number()?; diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index 71fe7e64073a..8c3cce805e55 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -47,11 +47,13 @@ impl ProviderFactory { /// database using different types of providers. Example: [`HeaderProvider`] /// [`BlockHashReader`]. This may fail if the inner read database transaction fails to open. pub fn provider(&self) -> RethResult> { - Ok(DatabaseProvider::new( - self.db.tx()?, - self.chain_spec.clone(), - self.snapshot_provider.clone(), - )) + let mut provider = DatabaseProvider::new(self.db.tx()?, self.chain_spec.clone()); + + if let Some(snapshot_provider) = &self.snapshot_provider { + provider = provider.with_snapshot_provider(snapshot_provider.clone()); + } + + Ok(provider) } /// Returns a provider with a created `DbTxMut` inside, which allows fetching and updating @@ -59,11 +61,13 @@ impl ProviderFactory { /// [`BlockHashReader`]. This may fail if the inner read/write database transaction fails to /// open. pub fn provider_rw(&self) -> RethResult> { - Ok(DatabaseProviderRW(DatabaseProvider::new_rw( - self.db.tx_mut()?, - self.chain_spec.clone(), - self.snapshot_provider.clone(), - ))) + let mut provider = DatabaseProvider::new_rw(self.db.tx_mut()?, self.chain_spec.clone()); + + if let Some(snapshot_provider) = &self.snapshot_provider { + provider = provider.with_snapshot_provider(snapshot_provider.clone()); + } + + Ok(DatabaseProviderRW(provider)) } } diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index fe63caac61d6..452ac4247cf6 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -104,12 +104,8 @@ pub struct DatabaseProvider { impl DatabaseProvider { /// Creates a provider with an inner read-write transaction. - pub fn new_rw( - tx: TX, - chain_spec: Arc, - snapshot_provider: Option>, - ) -> Self { - Self { tx, chain_spec, snapshot_provider } + pub fn new_rw(tx: TX, chain_spec: Arc) -> Self { + Self { tx, chain_spec, snapshot_provider: None } } } @@ -163,12 +159,14 @@ where impl DatabaseProvider { /// Creates a provider with an inner read-only transaction. - pub fn new( - tx: TX, - chain_spec: Arc, - snapshot_provider: Option>, - ) -> Self { - Self { tx, chain_spec, snapshot_provider } + pub fn new(tx: TX, chain_spec: Arc) -> Self { + Self { tx, chain_spec, snapshot_provider: None } + } + + /// Creates a new [`Self`] with access to a [`SnapshotProvider`]. + pub fn with_snapshot_provider(mut self, snapshot_provider: Arc) -> Self { + self.snapshot_provider = Some(snapshot_provider); + self } /// Consume `DbTx` or `DbTxMut`. From cda1ca79dcc202bd26e010f611b155206ca7df1a Mon Sep 17 00:00:00 2001 From: joshieDo Date: Fri, 3 Nov 2023 13:34:56 +0000 Subject: [PATCH 11/25] use strum for SnapshotSegment, compression and filters --- crates/primitives/src/snapshot/compression.rs | 8 ++- crates/primitives/src/snapshot/filters.rs | 9 ++- crates/primitives/src/snapshot/segment.rs | 56 ++++++++++--------- 3 files changed, 45 insertions(+), 28 deletions(-) diff --git a/crates/primitives/src/snapshot/compression.rs b/crates/primitives/src/snapshot/compression.rs index c67e3f63bc9a..69fe4b2a4328 100644 --- a/crates/primitives/src/snapshot/compression.rs +++ b/crates/primitives/src/snapshot/compression.rs @@ -1,11 +1,17 @@ -#[derive(Debug, Copy, Clone, Default)] +use strum::AsRefStr; + +#[derive(Debug, Copy, Clone, Default, AsRefStr)] #[cfg_attr(feature = "clap", derive(clap::ValueEnum))] #[allow(missing_docs)] /// Snapshot compression pub enum Compression { + #[strum(serialize = "lz4")] Lz4, + #[strum(serialize = "zstd")] Zstd, + #[strum(serialize = "zstd-dict")] ZstdWithDictionary, + #[strum(serialize = "uncompressed")] #[default] Uncompressed, } diff --git a/crates/primitives/src/snapshot/filters.rs b/crates/primitives/src/snapshot/filters.rs index e9716ac707da..3443d474706e 100644 --- a/crates/primitives/src/snapshot/filters.rs +++ b/crates/primitives/src/snapshot/filters.rs @@ -1,3 +1,5 @@ +use strum::AsRefStr; + #[derive(Debug, Copy, Clone)] /// Snapshot filters. pub enum Filters { @@ -14,20 +16,23 @@ impl Filters { } } -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, AsRefStr)] #[cfg_attr(feature = "clap", derive(clap::ValueEnum))] /// Snapshot inclusion filter. Also see [Filters]. pub enum InclusionFilter { + #[strum(serialize = "cuckoo")] /// Cuckoo filter Cuckoo, } -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, AsRefStr)] #[cfg_attr(feature = "clap", derive(clap::ValueEnum))] /// Snapshot perfect hashing function. Also see [Filters]. pub enum PerfectHashingFunction { + #[strum(serialize = "fmph")] /// Fingerprint-Based Minimal Perfect Hash Function Fmph, + #[strum(serialize = "gofmph")] /// Fingerprint-Based Minimal Perfect Hash Function with Group Optimization GoFmph, } diff --git a/crates/primitives/src/snapshot/segment.rs b/crates/primitives/src/snapshot/segment.rs index 8a86768ede68..318d7501b1bb 100644 --- a/crates/primitives/src/snapshot/segment.rs +++ b/crates/primitives/src/snapshot/segment.rs @@ -1,17 +1,35 @@ -use crate::{snapshot::PerfectHashingFunction, BlockNumber, TxNumber}; +use crate::{ + snapshot::{Compression, Filters, InclusionFilter}, + BlockNumber, TxNumber, +}; use serde::{Deserialize, Serialize}; -use std::{ops::RangeInclusive, path::PathBuf}; +use std::{ops::RangeInclusive, path::PathBuf, str::FromStr}; +use strum::{AsRefStr, EnumString}; -use super::{Compression, Filters, InclusionFilter}; - -#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd, Deserialize, Serialize)] +#[derive( + Debug, + Copy, + Clone, + Eq, + PartialEq, + Hash, + Ord, + PartialOrd, + Deserialize, + Serialize, + EnumString, + AsRefStr, +)] #[cfg_attr(feature = "clap", derive(clap::ValueEnum))] /// Segment of the data that can be snapshotted. pub enum SnapshotSegment { + #[strum(serialize = "headers")] /// Snapshot segment responsible for the `CanonicalHeaders`, `Headers`, `HeaderTD` tables. Headers, + #[strum(serialize = "transactions")] /// Snapshot segment responsible for the `Transactions` table. Transactions, + #[strum(serialize = "receipts")] /// Snapshot segment responsible for the `Receipts` table. Receipts, } @@ -44,35 +62,23 @@ impl SnapshotSegment { compression: Compression, range: &RangeInclusive, ) -> PathBuf { - let segment_name = match self { - SnapshotSegment::Headers => "headers", - SnapshotSegment::Transactions => "transactions", - SnapshotSegment::Receipts => "receipts", - }; + let segment_name = self.as_ref(); + let filters_name = match filters { Filters::WithFilters(inclusion_filter, phf) => { - let inclusion_filter = match inclusion_filter { - InclusionFilter::Cuckoo => "cuckoo", - }; - let phf = match phf { - PerfectHashingFunction::Fmph => "fmph", - PerfectHashingFunction::GoFmph => "gofmph", - }; - format!("{inclusion_filter}-{phf}") + format!("{}-{}", inclusion_filter.as_ref(), phf.as_ref()) } Filters::WithoutFilters => "none".to_string(), }; - let compression_name = match compression { - Compression::Lz4 => "lz4", - Compression::Zstd => "zstd", - Compression::ZstdWithDictionary => "zstd-dict", - Compression::Uncompressed => "uncompressed", - }; + // ATTENTION: if changing the name format, be sure to reflect those changes in + // [`Self::parse_filename`.] format!( - "snapshot_{segment_name}_{}_{}_{filters_name}_{compression_name}", + "snapshot_{segment_name}_{}_{}_{}_{}", range.start(), range.end(), + filters_name, + compression.as_ref() ) .into() } From ec89704e1f99425ae9a866feaf7c37de2f259922 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Fri, 3 Nov 2023 13:36:06 +0000 Subject: [PATCH 12/25] snapshotter takes a directory and reads from it --- Cargo.lock | 1 + bin/reth/src/node/mod.rs | 5 +- crates/primitives/src/snapshot/segment.rs | 16 +++++ crates/snapshot/Cargo.toml | 2 +- crates/snapshot/src/snapshotter.rs | 65 +++++++++++++++---- .../provider/src/providers/database/mod.rs | 5 +- .../src/providers/snapshot/manager.rs | 11 +++- 7 files changed, 88 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 05c28fd0dc68..166cd3a4abbb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6543,6 +6543,7 @@ dependencies = [ "reth-primitives", "reth-provider", "reth-stages", + "tempfile", "thiserror", "tokio", "tracing", diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index da4fe6a4ea76..108031f556a7 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -297,13 +297,14 @@ impl NodeCommand { // configure snapshotter let snapshotter = reth_snapshot::Snapshotter::new( db.clone(), + data_dir.snapshots_path(), self.chain.clone(), self.chain.snapshot_block_interval, - ); + )?; // setup the blockchain provider let factory = ProviderFactory::new(Arc::clone(&db), Arc::clone(&self.chain)) - .with_snapshots(snapshotter.highest_snapshot_receiver()); + .with_snapshots(data_dir.snapshots_path(), snapshotter.highest_snapshot_receiver()); let blockchain_db = BlockchainProvider::new(factory, blockchain_tree.clone())?; let blob_store = InMemoryBlobStore::default(); let validator = TransactionValidationTaskExecutor::eth_builder(Arc::clone(&self.chain)) diff --git a/crates/primitives/src/snapshot/segment.rs b/crates/primitives/src/snapshot/segment.rs index 318d7501b1bb..e59268473cb1 100644 --- a/crates/primitives/src/snapshot/segment.rs +++ b/crates/primitives/src/snapshot/segment.rs @@ -82,6 +82,22 @@ impl SnapshotSegment { ) .into() } + + /// Takes a filename and parses the [`SnapshotSegment`] and its inclusive range. + pub fn parse_filename(name: &str) -> Option<(Self, RangeInclusive)> { + let parts: Vec<&str> = name.split('_').collect(); + if let (Ok(segment), true) = (Self::from_str(parts[1]), parts.len() >= 4) { + let start: u64 = parts[2].parse().unwrap_or(0); + let end: u64 = parts[3].parse().unwrap_or(0); + + if start <= end || parts[0] != "snapshot" { + return None + } + + return Some((segment, start..=end)) + } + None + } } /// A segment header that contains information common to all segments. Used for storage. diff --git a/crates/snapshot/Cargo.toml b/crates/snapshot/Cargo.toml index 5597331fd2d2..6455863348c4 100644 --- a/crates/snapshot/Cargo.toml +++ b/crates/snapshot/Cargo.toml @@ -32,7 +32,7 @@ reth-db = { workspace = true, features = ["test-utils"] } reth-stages = { path = "../stages", features = ["test-utils"] } # misc - +tempfile.workspace = true assert_matches.workspace = true [features] diff --git a/crates/snapshot/src/snapshotter.rs b/crates/snapshot/src/snapshotter.rs index 3ba37d5f1316..e818b894c2fa 100644 --- a/crates/snapshot/src/snapshotter.rs +++ b/crates/snapshot/src/snapshotter.rs @@ -3,9 +3,11 @@ use crate::SnapshotterError; use reth_db::database::Database; use reth_interfaces::{RethError, RethResult}; -use reth_primitives::{snapshot::HighestSnapshots, BlockNumber, ChainSpec, TxNumber}; +use reth_primitives::{ + snapshot::HighestSnapshots, BlockNumber, ChainSpec, SnapshotSegment, TxNumber, +}; use reth_provider::{BlockReader, DatabaseProviderRO, ProviderFactory}; -use std::{collections::HashMap, ops::RangeInclusive, sync::Arc}; +use std::{collections::HashMap, ops::RangeInclusive, path::PathBuf, sync::Arc}; use tokio::sync::watch; use tracing::warn; @@ -20,6 +22,8 @@ pub type SnapshotterWithResult = (Snapshotter, SnapshotterResult); pub struct Snapshotter { /// Provider factory provider_factory: ProviderFactory, + /// Directory where snapshots are located + snapshots_path: PathBuf, /// Highest snapshot block number for each highest_snapshots: HighestSnapshots, /// Channel sender to notify other components of the new highest snapshot values @@ -79,11 +83,17 @@ impl SnapshotTargets { impl Snapshotter { /// Creates a new [Snapshotter]. - pub fn new(db: DB, chain_spec: Arc, block_interval: u64) -> Self { + pub fn new( + db: DB, + snapshots_path: PathBuf, + chain_spec: Arc, + block_interval: u64, + ) -> RethResult { let (highest_snapshots_notifier, highest_snapshots_tracker) = watch::channel(None); - let snapshotter = Self { + let mut snapshotter = Self { provider_factory: ProviderFactory::new(db, chain_spec), + snapshots_path, // TODO(alexey): fill from on-disk snapshot data highest_snapshots: HighestSnapshots::default(), highest_snapshots_notifier, @@ -91,9 +101,9 @@ impl Snapshotter { block_interval, }; - snapshotter.update_highest_snapshots_tracker(); + snapshotter.update_highest_snapshots_tracker()?; - snapshotter + Ok(snapshotter) } #[cfg(test)] @@ -109,10 +119,38 @@ impl Snapshotter { } } - fn update_highest_snapshots_tracker(&self) { + /// Looks into the snapshot directory to find the highest snapshotted block of each segment, and + /// notifies every tracker. + fn update_highest_snapshots_tracker(&mut self) -> RethResult<()> { + // It walks over the directory and parses the snapshot filenames extracing `SnapshotSegment` + // and their inclusive range. It then takes the maximum block number for each specific + // segment. + for (segment, range) in std::fs::read_dir(&self.snapshots_path) + .map_err(|err| RethError::Custom(err.to_string()))? + .filter_map(Result::ok) + .filter_map(|entry| { + if let Ok(true) = entry.metadata().map(|metadata| metadata.is_file()) { + return SnapshotSegment::parse_filename(&entry.file_name().to_string_lossy()) + } + None + }) + { + let max_segment_block = match segment { + SnapshotSegment::Headers => &mut self.highest_snapshots.headers, + SnapshotSegment::Transactions => &mut self.highest_snapshots.transactions, + SnapshotSegment::Receipts => &mut self.highest_snapshots.receipts, + }; + + if max_segment_block.map_or(true, |block| block < *range.end()) { + *max_segment_block = Some(*range.end()); + } + } + let _ = self.highest_snapshots_notifier.send(Some(self.highest_snapshots)).map_err(|_| { warn!(target: "snapshot", "Highest snapshots channel closed"); }); + + Ok(()) } /// Returns a new [`HighestSnapshotsTracker`]. @@ -127,7 +165,7 @@ impl Snapshotter { // TODO(alexey): snapshot logic - self.update_highest_snapshots_tracker(); + self.update_highest_snapshots_tracker()?; Ok(targets) } @@ -246,8 +284,10 @@ mod tests { #[test] fn new() { let tx = TestTransaction::default(); - - let snapshotter = Snapshotter::new(tx.inner_raw(), MAINNET.clone(), 2); + let snapshots_dir = tempfile::TempDir::new().unwrap(); + let snapshotter = + Snapshotter::new(tx.inner_raw(), snapshots_dir.into_path(), MAINNET.clone(), 2) + .unwrap(); assert_eq!( *snapshotter.highest_snapshot_receiver().borrow(), @@ -258,12 +298,15 @@ mod tests { #[test] fn get_snapshot_targets() { let tx = TestTransaction::default(); + let snapshots_dir = tempfile::TempDir::new().unwrap(); let mut rng = generators::rng(); let blocks = random_block_range(&mut rng, 0..=3, B256::ZERO, 2..3); tx.insert_blocks(blocks.iter(), None).expect("insert blocks"); - let mut snapshotter = Snapshotter::new(tx.inner_raw(), MAINNET.clone(), 2); + let mut snapshotter = + Snapshotter::new(tx.inner_raw(), snapshots_dir.into_path(), MAINNET.clone(), 2) + .unwrap(); // Snapshot targets has data per part up to the passed finalized block number, // respecting the block interval diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index 8c3cce805e55..ccbe0a97b42a 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -21,6 +21,7 @@ use reth_primitives::{ use revm::primitives::{BlockEnv, CfgEnv}; use std::{ ops::{RangeBounds, RangeInclusive}, + path::PathBuf, sync::Arc, }; use tokio::sync::watch; @@ -80,10 +81,12 @@ impl ProviderFactory { /// database provider comes with a shared snapshot provider pub fn with_snapshots( mut self, + snapshots_path: PathBuf, highest_snapshot_tracker: watch::Receiver>, ) -> Self { self.snapshot_provider = Some(Arc::new( - SnapshotProvider::default().with_highest_tracker(Some(highest_snapshot_tracker)), + SnapshotProvider::new(snapshots_path) + .with_highest_tracker(Some(highest_snapshot_tracker)), )); self } diff --git a/crates/storage/provider/src/providers/snapshot/manager.rs b/crates/storage/provider/src/providers/snapshot/manager.rs index 946e8349a7cb..f4c19d24f1e4 100644 --- a/crates/storage/provider/src/providers/snapshot/manager.rs +++ b/crates/storage/provider/src/providers/snapshot/manager.rs @@ -20,9 +20,16 @@ pub struct SnapshotProvider { map: DashMap<(BlockNumber, SnapshotSegment), LoadedJar>, /// Tracks the latest and highest snapshot of every segment. highest_tracker: Option>>, + /// Directory where snapshots are located + path: PathBuf, } impl SnapshotProvider { + /// Creates a new [`SnapshotProvider`]. + pub fn new(path: PathBuf) -> Self { + Self { map: Default::default(), highest_tracker: None, path } + } + /// Adds a highest snapshot tracker to the provider pub fn with_highest_tracker( mut self, @@ -50,9 +57,9 @@ impl SnapshotProvider { if let Some(path) = &path { self.map.insert(key, LoadedJar::new(NippyJar::load(path)?)?); } else { - path = Some(segment.filename( + path = Some(self.path.join(segment.filename( &((snapshot * BLOCKS_PER_SNAPSHOT)..=((snapshot + 1) * BLOCKS_PER_SNAPSHOT - 1)), - )); + ))); } self.get_segment_provider(segment, block, path) From e87c64d6c3679dfc8aa989254f9d1047c2d8e3a7 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Fri, 3 Nov 2023 14:10:57 +0000 Subject: [PATCH 13/25] default snapshot filename doesnt have configuration --- bin/reth/src/db/snapshots/headers.rs | 39 ++++++++++++++------- bin/reth/src/db/snapshots/receipts.rs | 41 ++++++++++++++--------- bin/reth/src/db/snapshots/transactions.rs | 40 +++++++++++++--------- crates/primitives/src/snapshot/segment.rs | 26 ++++++-------- crates/snapshot/src/segments/mod.rs | 4 +-- 5 files changed, 89 insertions(+), 61 deletions(-) diff --git a/bin/reth/src/db/snapshots/headers.rs b/bin/reth/src/db/snapshots/headers.rs index 6533dd881759..cc3bd0a0b152 100644 --- a/bin/reth/src/db/snapshots/headers.rs +++ b/bin/reth/src/db/snapshots/headers.rs @@ -12,8 +12,11 @@ use reth_primitives::{ use reth_provider::{ providers::SnapshotProvider, DatabaseProviderRO, HeaderProvider, ProviderError, ProviderFactory, }; -use reth_snapshot::segments::{Headers, Segment}; -use std::{path::Path, sync::Arc}; +use reth_snapshot::{segments, segments::Segment}; +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; impl Command { pub(crate) fn generate_headers_snapshot( @@ -23,15 +26,24 @@ impl Command { inclusion_filter: InclusionFilter, phf: PerfectHashingFunction, ) -> eyre::Result<()> { - let segment = Headers::new( - compression, - if self.with_filters { - Filters::WithFilters(inclusion_filter, phf) - } else { - Filters::WithoutFilters - }, - ); - segment.snapshot::(provider, self.from..=(self.from + self.block_interval - 1))?; + let range = self.from..=(self.from + self.block_interval - 1); + let filters = if self.with_filters { + Filters::WithFilters(inclusion_filter, phf) + } else { + Filters::WithoutFilters + }; + + let segment = segments::Headers::new(compression, filters); + + segment.snapshot::(provider, range.clone())?; + + // Default name doesn't have any configuration + let default_name: PathBuf = SnapshotSegment::Headers.filename(&range).into(); + let new_name: PathBuf = SnapshotSegment::Headers + .filename_with_configuration(filters, compression, &range) + .into(); + + std::fs::rename(default_name, new_name)?; Ok(()) } @@ -55,8 +67,9 @@ impl Command { let mut row_indexes = range.clone().collect::>(); let mut rng = rand::thread_rng(); - let path = - SnapshotSegment::Headers.filename_with_configuration(filters, compression, &range); + let path = SnapshotSegment::Headers + .filename_with_configuration(filters, compression, &range) + .into(); let provider = SnapshotProvider::default(); let jar_provider = provider.get_segment_provider(SnapshotSegment::Headers, self.from, Some(path))?; diff --git a/bin/reth/src/db/snapshots/receipts.rs b/bin/reth/src/db/snapshots/receipts.rs index ffe09814e2aa..70a95f31b9b8 100644 --- a/bin/reth/src/db/snapshots/receipts.rs +++ b/bin/reth/src/db/snapshots/receipts.rs @@ -14,7 +14,10 @@ use reth_provider::{ ReceiptProvider, TransactionsProvider, TransactionsProviderExt, }; use reth_snapshot::{segments, segments::Segment}; -use std::{path::Path, sync::Arc}; +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; impl Command { pub(crate) fn generate_receipts_snapshot( @@ -24,15 +27,24 @@ impl Command { inclusion_filter: InclusionFilter, phf: PerfectHashingFunction, ) -> eyre::Result<()> { - let segment = segments::Receipts::new( - compression, - if self.with_filters { - Filters::WithFilters(inclusion_filter, phf) - } else { - Filters::WithoutFilters - }, - ); - segment.snapshot::(provider, self.from..=(self.from + self.block_interval - 1))?; + let range = self.from..=(self.from + self.block_interval - 1); + let filters = if self.with_filters { + Filters::WithFilters(inclusion_filter, phf) + } else { + Filters::WithoutFilters + }; + + let segment = segments::Receipts::new(compression, filters); + + segment.snapshot::(provider, range.clone())?; + + // Default name doesn't have any configuration + let default_name: PathBuf = SnapshotSegment::Receipts.filename(&range).into(); + let new_name: PathBuf = SnapshotSegment::Receipts + .filename_with_configuration(filters, compression, &range) + .into(); + + std::fs::rename(default_name, new_name)?; Ok(()) } @@ -62,11 +74,10 @@ impl Command { let mut row_indexes = tx_range.clone().collect::>(); - let path = SnapshotSegment::Receipts.filename_with_configuration( - filters, - compression, - &block_range, - ); + let path = SnapshotSegment::Receipts + .filename_with_configuration(filters, compression, &block_range) + .into(); + let provider = SnapshotProvider::default(); let jar_provider = provider.get_segment_provider(SnapshotSegment::Receipts, self.from, Some(path))?; diff --git a/bin/reth/src/db/snapshots/transactions.rs b/bin/reth/src/db/snapshots/transactions.rs index a52c33ddb7f9..a2ced80a3f6a 100644 --- a/bin/reth/src/db/snapshots/transactions.rs +++ b/bin/reth/src/db/snapshots/transactions.rs @@ -14,7 +14,10 @@ use reth_provider::{ TransactionsProvider, TransactionsProviderExt, }; use reth_snapshot::{segments, segments::Segment}; -use std::{path::Path, sync::Arc}; +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; impl Command { pub(crate) fn generate_transactions_snapshot( @@ -24,15 +27,24 @@ impl Command { inclusion_filter: InclusionFilter, phf: PerfectHashingFunction, ) -> eyre::Result<()> { - let segment = segments::Transactions::new( - compression, - if self.with_filters { - Filters::WithFilters(inclusion_filter, phf) - } else { - Filters::WithoutFilters - }, - ); - segment.snapshot::(provider, self.from..=(self.from + self.block_interval - 1))?; + let range = self.from..=(self.from + self.block_interval - 1); + let filters = if self.with_filters { + Filters::WithFilters(inclusion_filter, phf) + } else { + Filters::WithoutFilters + }; + + let segment = segments::Transactions::new(compression, filters); + + segment.snapshot::(provider, range.clone())?; + + // Default name doesn't have any configuration + let default_name: PathBuf = SnapshotSegment::Transactions.filename(&range).into(); + let new_name: PathBuf = SnapshotSegment::Transactions + .filename_with_configuration(filters, compression, &range) + .into(); + + std::fs::rename(default_name, new_name)?; Ok(()) } @@ -62,11 +74,9 @@ impl Command { let mut row_indexes = tx_range.clone().collect::>(); - let path = SnapshotSegment::Transactions.filename_with_configuration( - filters, - compression, - &block_range, - ); + let path = SnapshotSegment::Transactions + .filename_with_configuration(filters, compression, &block_range) + .into(); let provider = SnapshotProvider::default(); let jar_provider = provider.get_segment_provider(SnapshotSegment::Transactions, self.from, Some(path))?; diff --git a/crates/primitives/src/snapshot/segment.rs b/crates/primitives/src/snapshot/segment.rs index e59268473cb1..1854f10921b2 100644 --- a/crates/primitives/src/snapshot/segment.rs +++ b/crates/primitives/src/snapshot/segment.rs @@ -3,7 +3,7 @@ use crate::{ BlockNumber, TxNumber, }; use serde::{Deserialize, Serialize}; -use std::{ops::RangeInclusive, path::PathBuf, str::FromStr}; +use std::{ops::RangeInclusive, str::FromStr}; use strum::{AsRefStr, EnumString}; #[derive( @@ -36,7 +36,7 @@ pub enum SnapshotSegment { impl SnapshotSegment { /// Returns the default configuration of the segment. - const fn config(&self) -> (Filters, Compression) { + pub const fn config(&self) -> (Filters, Compression) { let default_config = ( Filters::WithFilters(InclusionFilter::Cuckoo, super::PerfectHashingFunction::Fmph), Compression::Lz4, @@ -50,19 +50,20 @@ impl SnapshotSegment { } /// Returns the default file name for the provided segment and range. - pub fn filename(&self, range: &RangeInclusive) -> PathBuf { - let (filters, compression) = self.config(); - self.filename_with_configuration(filters, compression, range) + pub fn filename(&self, range: &RangeInclusive) -> String { + // ATTENTION: if changing the name format, be sure to reflect those changes in + // [`Self::parse_filename`.] + format!("snapshot_{}_{}_{}", self.as_ref(), range.start(), range.end(),) } - /// Returns file name for the provided segment, filters, compression and range. + /// Returns file name for the provided segment and range, alongisde filters, compression. pub fn filename_with_configuration( &self, filters: Filters, compression: Compression, range: &RangeInclusive, - ) -> PathBuf { - let segment_name = self.as_ref(); + ) -> String { + let prefix = self.filename(range); let filters_name = match filters { Filters::WithFilters(inclusion_filter, phf) => { @@ -73,14 +74,7 @@ impl SnapshotSegment { // ATTENTION: if changing the name format, be sure to reflect those changes in // [`Self::parse_filename`.] - format!( - "snapshot_{segment_name}_{}_{}_{}_{}", - range.start(), - range.end(), - filters_name, - compression.as_ref() - ) - .into() + format!("{prefix}_{}_{}", filters_name, compression.as_ref()) } /// Takes a filename and parses the [`SnapshotSegment`] and its inclusive range. diff --git a/crates/snapshot/src/segments/mod.rs b/crates/snapshot/src/segments/mod.rs index 9a8bb462789f..6293c3896c10 100644 --- a/crates/snapshot/src/segments/mod.rs +++ b/crates/snapshot/src/segments/mod.rs @@ -19,7 +19,7 @@ use reth_primitives::{ BlockNumber, SnapshotSegment, }; use reth_provider::{DatabaseProviderRO, TransactionsProviderExt}; -use std::ops::RangeInclusive; +use std::{ops::RangeInclusive, path::Path}; pub(crate) type Rows = [Vec>; COLUMNS]; @@ -61,7 +61,7 @@ pub(crate) fn prepare_jar( let tx_range = provider.transaction_range_by_block_range(block_range.clone())?; let mut nippy_jar = NippyJar::new( COLUMNS, - &segment.filename_with_configuration(filters, compression, &block_range), + Path::new(segment.filename(&block_range).as_str()), SegmentHeader::new(block_range, tx_range, segment), ); From 99c1fa505f9eedd8a64cd712c6af4027de040581 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Fri, 3 Nov 2023 17:08:58 +0000 Subject: [PATCH 14/25] create snapshots directory if it doesnt exist --- crates/snapshot/src/snapshotter.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/crates/snapshot/src/snapshotter.rs b/crates/snapshot/src/snapshotter.rs index e818b894c2fa..c3ad10710588 100644 --- a/crates/snapshot/src/snapshotter.rs +++ b/crates/snapshot/src/snapshotter.rs @@ -89,6 +89,16 @@ impl Snapshotter { chain_spec: Arc, block_interval: u64, ) -> RethResult { + // Create directory for snapshots if it doesn't exist. + if !snapshots_path.exists() { + std::fs::create_dir_all(&snapshots_path).map_err(|e| { + RethError::Custom(format!( + "Could not create snapshots directory {}: {e}", + snapshots_path.display() + )) + })?; + } + let (highest_snapshots_notifier, highest_snapshots_tracker) = watch::channel(None); let mut snapshotter = Self { From d0699f786b6e864321f2053143fdd87f6dec996e Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Mon, 13 Nov 2023 11:40:46 +0000 Subject: [PATCH 15/25] Update crates/snapshot/src/snapshotter.rs Co-authored-by: Alexey Shekhirin --- crates/snapshot/src/snapshotter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/snapshot/src/snapshotter.rs b/crates/snapshot/src/snapshotter.rs index 3ba37d5f1316..c675a3ed33b0 100644 --- a/crates/snapshot/src/snapshotter.rs +++ b/crates/snapshot/src/snapshotter.rs @@ -20,7 +20,7 @@ pub type SnapshotterWithResult = (Snapshotter, SnapshotterResult); pub struct Snapshotter { /// Provider factory provider_factory: ProviderFactory, - /// Highest snapshot block number for each + /// Highest snapshotted block numbers for each segment highest_snapshots: HighestSnapshots, /// Channel sender to notify other components of the new highest snapshot values highest_snapshots_notifier: watch::Sender>, From 892b2600a5fea8aff4bbc40d5ce64423e9eb2ab6 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Mon, 13 Nov 2023 11:40:54 +0000 Subject: [PATCH 16/25] Update crates/snapshot/src/snapshotter.rs Co-authored-by: Alexey Shekhirin --- crates/snapshot/src/snapshotter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/snapshot/src/snapshotter.rs b/crates/snapshot/src/snapshotter.rs index c675a3ed33b0..6bc722f0f688 100644 --- a/crates/snapshot/src/snapshotter.rs +++ b/crates/snapshot/src/snapshotter.rs @@ -22,7 +22,7 @@ pub struct Snapshotter { provider_factory: ProviderFactory, /// Highest snapshotted block numbers for each segment highest_snapshots: HighestSnapshots, - /// Channel sender to notify other components of the new highest snapshot values + /// Channel sender to notify other components of the new highest snapshots highest_snapshots_notifier: watch::Sender>, /// Channel receiver to be cloned and shared that already comes with the newest value highest_snapshots_tracker: HighestSnapshotsTracker, From 756d00b51bb2fc1cdd551078301c9e78a0af7d8a Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Mon, 13 Nov 2023 11:41:07 +0000 Subject: [PATCH 17/25] Update crates/storage/provider/src/providers/snapshot/manager.rs Co-authored-by: Alexey Shekhirin --- crates/storage/provider/src/providers/snapshot/manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/storage/provider/src/providers/snapshot/manager.rs b/crates/storage/provider/src/providers/snapshot/manager.rs index 946e8349a7cb..1b26f1db633d 100644 --- a/crates/storage/provider/src/providers/snapshot/manager.rs +++ b/crates/storage/provider/src/providers/snapshot/manager.rs @@ -18,7 +18,7 @@ pub struct SnapshotProvider { /// Maintains a map which allows for concurrent access to different `NippyJars`, over different /// segments and ranges. map: DashMap<(BlockNumber, SnapshotSegment), LoadedJar>, - /// Tracks the latest and highest snapshot of every segment. + /// Tracks the highest snapshot of every segment. highest_tracker: Option>>, } From 5891d0a3c9160969bd0a382932fc49986cde67fd Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Mon, 13 Nov 2023 11:53:58 +0000 Subject: [PATCH 18/25] Update crates/primitives/src/snapshot/segment.rs Co-authored-by: Alexey Shekhirin --- crates/primitives/src/snapshot/segment.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/primitives/src/snapshot/segment.rs b/crates/primitives/src/snapshot/segment.rs index 1854f10921b2..309d2c4ba7fe 100644 --- a/crates/primitives/src/snapshot/segment.rs +++ b/crates/primitives/src/snapshot/segment.rs @@ -52,7 +52,7 @@ impl SnapshotSegment { /// Returns the default file name for the provided segment and range. pub fn filename(&self, range: &RangeInclusive) -> String { // ATTENTION: if changing the name format, be sure to reflect those changes in - // [`Self::parse_filename`.] + // [`Self::parse_filename`]. format!("snapshot_{}_{}_{}", self.as_ref(), range.start(), range.end(),) } From d2c21e92ba5fec58adaf2b6e83ea766aab9a1c9d Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Mon, 13 Nov 2023 11:54:12 +0000 Subject: [PATCH 19/25] Update crates/snapshot/src/snapshotter.rs Co-authored-by: Alexey Shekhirin --- crates/snapshot/src/snapshotter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/snapshot/src/snapshotter.rs b/crates/snapshot/src/snapshotter.rs index c3ad10710588..c3e587343eac 100644 --- a/crates/snapshot/src/snapshotter.rs +++ b/crates/snapshot/src/snapshotter.rs @@ -132,7 +132,7 @@ impl Snapshotter { /// Looks into the snapshot directory to find the highest snapshotted block of each segment, and /// notifies every tracker. fn update_highest_snapshots_tracker(&mut self) -> RethResult<()> { - // It walks over the directory and parses the snapshot filenames extracing `SnapshotSegment` + // It walks over the directory and parses the snapshot filenames extracting `SnapshotSegment` // and their inclusive range. It then takes the maximum block number for each specific // segment. for (segment, range) in std::fs::read_dir(&self.snapshots_path) From 4574c68d4241da84dc9fcde263d5ea61b2af5bbc Mon Sep 17 00:00:00 2001 From: joshieDo Date: Mon, 13 Nov 2023 11:58:53 +0000 Subject: [PATCH 20/25] add fn block_range to snapshot cmd --- bin/reth/src/db/snapshots/headers.rs | 4 ++-- bin/reth/src/db/snapshots/mod.rs | 7 ++++++- bin/reth/src/db/snapshots/receipts.rs | 2 +- bin/reth/src/db/snapshots/transactions.rs | 2 +- crates/snapshot/src/snapshotter.rs | 6 +++--- 5 files changed, 13 insertions(+), 8 deletions(-) diff --git a/bin/reth/src/db/snapshots/headers.rs b/bin/reth/src/db/snapshots/headers.rs index cc3bd0a0b152..932711ff189e 100644 --- a/bin/reth/src/db/snapshots/headers.rs +++ b/bin/reth/src/db/snapshots/headers.rs @@ -26,7 +26,7 @@ impl Command { inclusion_filter: InclusionFilter, phf: PerfectHashingFunction, ) -> eyre::Result<()> { - let range = self.from..=(self.from + self.block_interval - 1); + let range = self.block_range(); let filters = if self.with_filters { Filters::WithFilters(inclusion_filter, phf) } else { @@ -63,7 +63,7 @@ impl Command { Filters::WithoutFilters }; - let range = self.from..=(self.from + self.block_interval - 1); + let range = self.block_range(); let mut row_indexes = range.clone().collect::>(); let mut rng = rand::thread_rng(); diff --git a/bin/reth/src/db/snapshots/mod.rs b/bin/reth/src/db/snapshots/mod.rs index efce4878393a..80f0813c539d 100644 --- a/bin/reth/src/db/snapshots/mod.rs +++ b/bin/reth/src/db/snapshots/mod.rs @@ -7,7 +7,7 @@ use reth_primitives::{ BlockNumber, ChainSpec, SnapshotSegment, }; use reth_provider::ProviderFactory; -use std::{path::Path, sync::Arc}; +use std::{ops::RangeInclusive, path::Path, sync::Arc}; mod bench; mod headers; @@ -130,4 +130,9 @@ impl Command { Ok(()) } + + /// Gives out the inclusive block range for the snapshot requested by the user. + fn block_range(&self) -> RangeInclusive { + self.from..=(self.from + self.block_interval - 1) + } } diff --git a/bin/reth/src/db/snapshots/receipts.rs b/bin/reth/src/db/snapshots/receipts.rs index 70a95f31b9b8..bf4889421a31 100644 --- a/bin/reth/src/db/snapshots/receipts.rs +++ b/bin/reth/src/db/snapshots/receipts.rs @@ -27,7 +27,7 @@ impl Command { inclusion_filter: InclusionFilter, phf: PerfectHashingFunction, ) -> eyre::Result<()> { - let range = self.from..=(self.from + self.block_interval - 1); + let range = self.block_range(); let filters = if self.with_filters { Filters::WithFilters(inclusion_filter, phf) } else { diff --git a/bin/reth/src/db/snapshots/transactions.rs b/bin/reth/src/db/snapshots/transactions.rs index a2ced80a3f6a..44094a95fcdf 100644 --- a/bin/reth/src/db/snapshots/transactions.rs +++ b/bin/reth/src/db/snapshots/transactions.rs @@ -27,7 +27,7 @@ impl Command { inclusion_filter: InclusionFilter, phf: PerfectHashingFunction, ) -> eyre::Result<()> { - let range = self.from..=(self.from + self.block_interval - 1); + let range = self.block_range(); let filters = if self.with_filters { Filters::WithFilters(inclusion_filter, phf) } else { diff --git a/crates/snapshot/src/snapshotter.rs b/crates/snapshot/src/snapshotter.rs index d29500f40a30..a6f85a48b9fc 100644 --- a/crates/snapshot/src/snapshotter.rs +++ b/crates/snapshot/src/snapshotter.rs @@ -132,9 +132,9 @@ impl Snapshotter { /// Looks into the snapshot directory to find the highest snapshotted block of each segment, and /// notifies every tracker. fn update_highest_snapshots_tracker(&mut self) -> RethResult<()> { - // It walks over the directory and parses the snapshot filenames extracting `SnapshotSegment` - // and their inclusive range. It then takes the maximum block number for each specific - // segment. + // It walks over the directory and parses the snapshot filenames extracting + // `SnapshotSegment` and their inclusive range. It then takes the maximum block + // number for each specific segment. for (segment, range) in std::fs::read_dir(&self.snapshots_path) .map_err(|err| RethError::Custom(err.to_string()))? .filter_map(Result::ok) From d47812a728d66fcde315aba901ceb7cac572b373 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Mon, 13 Nov 2023 12:10:26 +0000 Subject: [PATCH 21/25] add read_dir to primitives fs --- crates/interfaces/src/error.rs | 6 ++++++ crates/primitives/src/fs.rs | 17 ++++++++++++++++- crates/snapshot/src/snapshotter.rs | 10 ++-------- 3 files changed, 24 insertions(+), 9 deletions(-) diff --git a/crates/interfaces/src/error.rs b/crates/interfaces/src/error.rs index b972124fdba4..e40a1abd5f67 100644 --- a/crates/interfaces/src/error.rs +++ b/crates/interfaces/src/error.rs @@ -39,6 +39,12 @@ impl From for RethError { } } +impl From for RethError { + fn from(err: reth_primitives::fs::FsPathError) -> Self { + RethError::Custom(err.to_string()) + } +} + // We don't want these types to be too large because they're used in a lot of places. const _SIZE_ASSERTIONS: () = { // Main error. diff --git a/crates/primitives/src/fs.rs b/crates/primitives/src/fs.rs index f31b279c5b58..cf7825217d6a 100644 --- a/crates/primitives/src/fs.rs +++ b/crates/primitives/src/fs.rs @@ -1,6 +1,7 @@ //! Wrapper for `std::fs` methods use std::{ - fs, io, + fs::{self, ReadDir}, + io, path::{Path, PathBuf}, }; @@ -30,6 +31,9 @@ pub enum FsPathError { /// Provides additional path context for [`std::fs::remove_dir`]. #[error("failed to remove dir {path:?}: {source}")] RemoveDir { source: io::Error, path: PathBuf }, + /// Provides additional path context for [`std::fs::read_dir`]. + #[error("failed to read dir {path:?}: {source}")] + ReadDir { source: io::Error, path: PathBuf }, /// Provides additional path context for [`std::fs::File::open`]. #[error("failed to open file {path:?}: {source}")] Open { source: io::Error, path: PathBuf }, @@ -77,6 +81,11 @@ impl FsPathError { FsPathError::RemoveDir { source, path: path.into() } } + /// Returns the complementary error variant for [`std::fs::read_dir`]. + pub fn read_dir(source: io::Error, path: impl Into) -> Self { + FsPathError::ReadDir { source, path: path.into() } + } + /// Returns the complementary error variant for [`std::fs::File::open`]. pub fn open(source: io::Error, path: impl Into) -> Self { FsPathError::Open { source, path: path.into() } @@ -108,3 +117,9 @@ pub fn create_dir_all(path: impl AsRef) -> Result<()> { let path = path.as_ref(); fs::create_dir_all(path).map_err(|err| FsPathError::create_dir(err, path)) } + +/// Wrapper for `std::fs::read_dir` +pub fn read_dir(path: impl AsRef) -> Result { + let path = path.as_ref(); + fs::read_dir(path).map_err(|err| FsPathError::read_dir(err, path)) +} diff --git a/crates/snapshot/src/snapshotter.rs b/crates/snapshot/src/snapshotter.rs index a6f85a48b9fc..c8790d336adf 100644 --- a/crates/snapshot/src/snapshotter.rs +++ b/crates/snapshot/src/snapshotter.rs @@ -91,12 +91,7 @@ impl Snapshotter { ) -> RethResult { // Create directory for snapshots if it doesn't exist. if !snapshots_path.exists() { - std::fs::create_dir_all(&snapshots_path).map_err(|e| { - RethError::Custom(format!( - "Could not create snapshots directory {}: {e}", - snapshots_path.display() - )) - })?; + reth_primitives::fs::create_dir_all(&snapshots_path)?; } let (highest_snapshots_notifier, highest_snapshots_tracker) = watch::channel(None); @@ -135,8 +130,7 @@ impl Snapshotter { // It walks over the directory and parses the snapshot filenames extracting // `SnapshotSegment` and their inclusive range. It then takes the maximum block // number for each specific segment. - for (segment, range) in std::fs::read_dir(&self.snapshots_path) - .map_err(|err| RethError::Custom(err.to_string()))? + for (segment, range) in reth_primitives::fs::read_dir(&self.snapshots_path)? .filter_map(Result::ok) .filter_map(|entry| { if let Ok(true) = entry.metadata().map(|metadata| metadata.is_file()) { From 7054744956f05904cd1c11cb01c649c7d24f2a9c Mon Sep 17 00:00:00 2001 From: joshieDo Date: Tue, 14 Nov 2023 15:01:20 +0000 Subject: [PATCH 22/25] add reth_primitives::fs::rename --- bin/reth/src/db/snapshots/headers.rs | 10 ++++------ bin/reth/src/db/snapshots/receipts.rs | 10 ++++------ bin/reth/src/db/snapshots/transactions.rs | 10 ++++------ crates/primitives/src/fs.rs | 15 +++++++++++++++ 4 files changed, 27 insertions(+), 18 deletions(-) diff --git a/bin/reth/src/db/snapshots/headers.rs b/bin/reth/src/db/snapshots/headers.rs index 932711ff189e..ee89995c2c30 100644 --- a/bin/reth/src/db/snapshots/headers.rs +++ b/bin/reth/src/db/snapshots/headers.rs @@ -38,12 +38,10 @@ impl Command { segment.snapshot::(provider, range.clone())?; // Default name doesn't have any configuration - let default_name: PathBuf = SnapshotSegment::Headers.filename(&range).into(); - let new_name: PathBuf = SnapshotSegment::Headers - .filename_with_configuration(filters, compression, &range) - .into(); - - std::fs::rename(default_name, new_name)?; + reth_primitives::fs::rename( + SnapshotSegment::Headers.filename(&range), + SnapshotSegment::Headers.filename_with_configuration(filters, compression, &range), + )?; Ok(()) } diff --git a/bin/reth/src/db/snapshots/receipts.rs b/bin/reth/src/db/snapshots/receipts.rs index bf4889421a31..9c3b94ca4580 100644 --- a/bin/reth/src/db/snapshots/receipts.rs +++ b/bin/reth/src/db/snapshots/receipts.rs @@ -39,12 +39,10 @@ impl Command { segment.snapshot::(provider, range.clone())?; // Default name doesn't have any configuration - let default_name: PathBuf = SnapshotSegment::Receipts.filename(&range).into(); - let new_name: PathBuf = SnapshotSegment::Receipts - .filename_with_configuration(filters, compression, &range) - .into(); - - std::fs::rename(default_name, new_name)?; + reth_primitives::fs::rename( + SnapshotSegment::Receipts.filename(&range), + SnapshotSegment::Receipts.filename_with_configuration(filters, compression, &range), + )?; Ok(()) } diff --git a/bin/reth/src/db/snapshots/transactions.rs b/bin/reth/src/db/snapshots/transactions.rs index 44094a95fcdf..b5bd726fd9e4 100644 --- a/bin/reth/src/db/snapshots/transactions.rs +++ b/bin/reth/src/db/snapshots/transactions.rs @@ -39,12 +39,10 @@ impl Command { segment.snapshot::(provider, range.clone())?; // Default name doesn't have any configuration - let default_name: PathBuf = SnapshotSegment::Transactions.filename(&range).into(); - let new_name: PathBuf = SnapshotSegment::Transactions - .filename_with_configuration(filters, compression, &range) - .into(); - - std::fs::rename(default_name, new_name)?; + reth_primitives::fs::rename( + SnapshotSegment::Transactions.filename(&range), + SnapshotSegment::Transactions.filename_with_configuration(filters, compression, &range), + )?; Ok(()) } diff --git a/crates/primitives/src/fs.rs b/crates/primitives/src/fs.rs index cf7825217d6a..3388d477003d 100644 --- a/crates/primitives/src/fs.rs +++ b/crates/primitives/src/fs.rs @@ -34,6 +34,9 @@ pub enum FsPathError { /// Provides additional path context for [`std::fs::read_dir`]. #[error("failed to read dir {path:?}: {source}")] ReadDir { source: io::Error, path: PathBuf }, + /// Provides additional context for [`std::fs::rename`]. + #[error("failed to rename from {from:?} to {to:?}: {source}")] + Rename { source: io::Error, from: PathBuf, to: PathBuf }, /// Provides additional path context for [`std::fs::File::open`]. #[error("failed to open file {path:?}: {source}")] Open { source: io::Error, path: PathBuf }, @@ -90,6 +93,11 @@ impl FsPathError { pub fn open(source: io::Error, path: impl Into) -> Self { FsPathError::Open { source, path: path.into() } } + + /// Returns the complementary error variant for [`std::fs::rename`]. + pub fn rename(source: io::Error, from: impl Into, to: impl Into) -> Self { + FsPathError::Rename { source, from: from.into(), to: to.into() } + } } type Result = std::result::Result; @@ -123,3 +131,10 @@ pub fn read_dir(path: impl AsRef) -> Result { let path = path.as_ref(); fs::read_dir(path).map_err(|err| FsPathError::read_dir(err, path)) } + +/// Wrapper for `std::fs::rename` +pub fn rename(from: impl AsRef, to: impl AsRef) -> Result<()> { + let from = from.as_ref(); + let to = to.as_ref(); + fs::rename(from, to).map_err(|err| FsPathError::rename(err, from, to)) +} From 77094e5060333385e75aee14aaca708dc56c669f Mon Sep 17 00:00:00 2001 From: joshieDo Date: Tue, 14 Nov 2023 15:09:11 +0000 Subject: [PATCH 23/25] clippy --- bin/reth/src/cli/mod.rs | 2 +- bin/reth/src/db/snapshots/headers.rs | 2 +- bin/reth/src/db/snapshots/receipts.rs | 2 +- bin/reth/src/db/snapshots/transactions.rs | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/bin/reth/src/cli/mod.rs b/bin/reth/src/cli/mod.rs index d48afb627cc5..1361b53d2d11 100644 --- a/bin/reth/src/cli/mod.rs +++ b/bin/reth/src/cli/mod.rs @@ -363,7 +363,7 @@ mod tests { let end = format!("reth/logs/{}", SUPPORTED_CHAINS[0]); assert!(log_dir.as_ref().ends_with(end), "{:?}", log_dir); - let mut iter = SUPPORTED_CHAINS.into_iter(); + let mut iter = SUPPORTED_CHAINS.iter(); iter.next(); for chain in iter { let mut reth = Cli::<()>::try_parse_from(["reth", "node", "--chain", chain]).unwrap(); diff --git a/bin/reth/src/db/snapshots/headers.rs b/bin/reth/src/db/snapshots/headers.rs index ee89995c2c30..8b737e833b22 100644 --- a/bin/reth/src/db/snapshots/headers.rs +++ b/bin/reth/src/db/snapshots/headers.rs @@ -14,7 +14,7 @@ use reth_provider::{ }; use reth_snapshot::{segments, segments::Segment}; use std::{ - path::{Path, PathBuf}, + path::{Path}, sync::Arc, }; diff --git a/bin/reth/src/db/snapshots/receipts.rs b/bin/reth/src/db/snapshots/receipts.rs index 9c3b94ca4580..e555c0beb1d1 100644 --- a/bin/reth/src/db/snapshots/receipts.rs +++ b/bin/reth/src/db/snapshots/receipts.rs @@ -15,7 +15,7 @@ use reth_provider::{ }; use reth_snapshot::{segments, segments::Segment}; use std::{ - path::{Path, PathBuf}, + path::{Path}, sync::Arc, }; diff --git a/bin/reth/src/db/snapshots/transactions.rs b/bin/reth/src/db/snapshots/transactions.rs index b5bd726fd9e4..54b6da65df0c 100644 --- a/bin/reth/src/db/snapshots/transactions.rs +++ b/bin/reth/src/db/snapshots/transactions.rs @@ -15,7 +15,7 @@ use reth_provider::{ }; use reth_snapshot::{segments, segments::Segment}; use std::{ - path::{Path, PathBuf}, + path::{Path}, sync::Arc, }; From 622159b58cb1e1172fa35c6821ca85f37af7cdcf Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Tue, 14 Nov 2023 15:43:53 +0000 Subject: [PATCH 24/25] Update crates/primitives/src/fs.rs Co-authored-by: Matthias Seitz --- crates/primitives/src/fs.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/primitives/src/fs.rs b/crates/primitives/src/fs.rs index 3388d477003d..8e4e50acd6e8 100644 --- a/crates/primitives/src/fs.rs +++ b/crates/primitives/src/fs.rs @@ -35,7 +35,7 @@ pub enum FsPathError { #[error("failed to read dir {path:?}: {source}")] ReadDir { source: io::Error, path: PathBuf }, /// Provides additional context for [`std::fs::rename`]. - #[error("failed to rename from {from:?} to {to:?}: {source}")] + #[error("failed to rename {from:?} to {to:?}: {source}")] Rename { source: io::Error, from: PathBuf, to: PathBuf }, /// Provides additional path context for [`std::fs::File::open`]. #[error("failed to open file {path:?}: {source}")] From 975b47d6393c0c78d8ad78d46d8b7ae8d0a13459 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Tue, 14 Nov 2023 16:11:10 +0000 Subject: [PATCH 25/25] fmt --- bin/reth/src/db/snapshots/headers.rs | 5 +---- bin/reth/src/db/snapshots/receipts.rs | 5 +---- bin/reth/src/db/snapshots/transactions.rs | 5 +---- 3 files changed, 3 insertions(+), 12 deletions(-) diff --git a/bin/reth/src/db/snapshots/headers.rs b/bin/reth/src/db/snapshots/headers.rs index 8b737e833b22..b09b99ebc648 100644 --- a/bin/reth/src/db/snapshots/headers.rs +++ b/bin/reth/src/db/snapshots/headers.rs @@ -13,10 +13,7 @@ use reth_provider::{ providers::SnapshotProvider, DatabaseProviderRO, HeaderProvider, ProviderError, ProviderFactory, }; use reth_snapshot::{segments, segments::Segment}; -use std::{ - path::{Path}, - sync::Arc, -}; +use std::{path::Path, sync::Arc}; impl Command { pub(crate) fn generate_headers_snapshot( diff --git a/bin/reth/src/db/snapshots/receipts.rs b/bin/reth/src/db/snapshots/receipts.rs index e555c0beb1d1..b0475eeff1b5 100644 --- a/bin/reth/src/db/snapshots/receipts.rs +++ b/bin/reth/src/db/snapshots/receipts.rs @@ -14,10 +14,7 @@ use reth_provider::{ ReceiptProvider, TransactionsProvider, TransactionsProviderExt, }; use reth_snapshot::{segments, segments::Segment}; -use std::{ - path::{Path}, - sync::Arc, -}; +use std::{path::Path, sync::Arc}; impl Command { pub(crate) fn generate_receipts_snapshot( diff --git a/bin/reth/src/db/snapshots/transactions.rs b/bin/reth/src/db/snapshots/transactions.rs index 54b6da65df0c..9d3530d40286 100644 --- a/bin/reth/src/db/snapshots/transactions.rs +++ b/bin/reth/src/db/snapshots/transactions.rs @@ -14,10 +14,7 @@ use reth_provider::{ TransactionsProvider, TransactionsProviderExt, }; use reth_snapshot::{segments, segments::Segment}; -use std::{ - path::{Path}, - sync::Arc, -}; +use std::{path::Path, sync::Arc}; impl Command { pub(crate) fn generate_transactions_snapshot(