Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add directory paths to Snapshotter and SnapshotProvider #5283

Merged
merged 29 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
eb00e6d
add snapshots_path to ChainPath
joshieDo Oct 31, 2023
a8ef221
move HighestSnapshots to primitives crate
joshieDo Oct 31, 2023
f1f32d7
add with_highest_tracker to SnapshotProvider
joshieDo Oct 31, 2023
f5cadf8
add a shared snapshot provvider to providerfactory and dbprovider
joshieDo Oct 31, 2023
1acd28f
add snapshot provider to shared blockchain_db
joshieDo Oct 31, 2023
470a6b6
allow unused for now
joshieDo Oct 31, 2023
113ef23
add get_highest_snapshot to SnapshotProvider
joshieDo Oct 31, 2023
73fd00f
move hihgest snapshot channel inside snapshotter
joshieDo Nov 2, 2023
2fef51f
add default receiver to snapshotter
joshieDo Nov 2, 2023
33dc284
replace with with_snapshot_provider on db provider
joshieDo Nov 2, 2023
cda1ca7
use strum for SnapshotSegment, compression and filters
joshieDo Nov 3, 2023
ec89704
snapshotter takes a directory and reads from it
joshieDo Nov 3, 2023
e87c64d
default snapshot filename doesnt have configuration
joshieDo Nov 3, 2023
99c1fa5
create snapshots directory if it doesnt exist
joshieDo Nov 3, 2023
3be09cd
Merge remote-tracking branch 'origin/main' into joshie/db-snap-provider
joshieDo Nov 8, 2023
96b255c
Merge branch 'joshie/db-snap-provider' into joshie/db-snap-provider2
joshieDo Nov 8, 2023
d0699f7
Update crates/snapshot/src/snapshotter.rs
joshieDo Nov 13, 2023
892b260
Update crates/snapshot/src/snapshotter.rs
joshieDo Nov 13, 2023
756d00b
Update crates/storage/provider/src/providers/snapshot/manager.rs
joshieDo Nov 13, 2023
5891d0a
Update crates/primitives/src/snapshot/segment.rs
joshieDo Nov 13, 2023
d2c21e9
Update crates/snapshot/src/snapshotter.rs
joshieDo Nov 13, 2023
a11751f
Merge branch 'joshie/db-snap-provider' into joshie/db-snap-provider2
joshieDo Nov 13, 2023
4574c68
add fn block_range to snapshot cmd
joshieDo Nov 13, 2023
d47812a
add read_dir to primitives fs
joshieDo Nov 13, 2023
7054744
add reth_primitives::fs::rename
joshieDo Nov 14, 2023
77094e5
clippy
joshieDo Nov 14, 2023
622159b
Update crates/primitives/src/fs.rs
joshieDo Nov 14, 2023
975b47d
fmt
joshieDo Nov 14, 2023
c9ee102
Merge remote-tracking branch 'origin/main' into joshie/db-snap-provider2
joshieDo Nov 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 26 additions & 13 deletions bin/reth/src/db/snapshots/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ use reth_primitives::{
use reth_provider::{
providers::SnapshotProvider, DatabaseProviderRO, HeaderProvider, ProviderError, ProviderFactory,
};
use reth_snapshot::segments::{Headers, Segment};
use std::{path::Path, sync::Arc};
use reth_snapshot::{segments, segments::Segment};
use std::{
path::{Path, PathBuf},
sync::Arc,
};

impl Command {
pub(crate) fn generate_headers_snapshot<DB: Database>(
Expand All @@ -23,15 +26,24 @@ impl Command {
inclusion_filter: InclusionFilter,
phf: PerfectHashingFunction,
) -> eyre::Result<()> {
let segment = Headers::new(
compression,
if self.with_filters {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
},
);
segment.snapshot::<DB>(provider, self.from..=(self.from + self.block_interval - 1))?;
let range = self.from..=(self.from + self.block_interval - 1);
joshieDo marked this conversation as resolved.
Show resolved Hide resolved
let filters = if self.with_filters {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
};

let segment = segments::Headers::new(compression, filters);

segment.snapshot::<DB>(provider, range.clone())?;

// Default name doesn't have any configuration
let default_name: PathBuf = SnapshotSegment::Headers.filename(&range).into();
let new_name: PathBuf = SnapshotSegment::Headers
.filename_with_configuration(filters, compression, &range)
.into();
joshieDo marked this conversation as resolved.
Show resolved Hide resolved

std::fs::rename(default_name, new_name)?;
joshieDo marked this conversation as resolved.
Show resolved Hide resolved

Ok(())
}
Expand All @@ -55,8 +67,9 @@ impl Command {

let mut row_indexes = range.clone().collect::<Vec<_>>();
let mut rng = rand::thread_rng();
let path =
SnapshotSegment::Headers.filename_with_configuration(filters, compression, &range);
let path = SnapshotSegment::Headers
.filename_with_configuration(filters, compression, &range)
.into();
let provider = SnapshotProvider::default();
let jar_provider =
provider.get_segment_provider(SnapshotSegment::Headers, self.from, Some(path))?;
Expand Down
41 changes: 26 additions & 15 deletions bin/reth/src/db/snapshots/receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use reth_provider::{
ReceiptProvider, TransactionsProvider, TransactionsProviderExt,
};
use reth_snapshot::{segments, segments::Segment};
use std::{path::Path, sync::Arc};
use std::{
path::{Path, PathBuf},
sync::Arc,
};

impl Command {
pub(crate) fn generate_receipts_snapshot<DB: Database>(
Expand All @@ -24,15 +27,24 @@ impl Command {
inclusion_filter: InclusionFilter,
phf: PerfectHashingFunction,
) -> eyre::Result<()> {
let segment = segments::Receipts::new(
compression,
if self.with_filters {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
},
);
segment.snapshot::<DB>(provider, self.from..=(self.from + self.block_interval - 1))?;
let range = self.from..=(self.from + self.block_interval - 1);
let filters = if self.with_filters {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
};

let segment = segments::Receipts::new(compression, filters);

segment.snapshot::<DB>(provider, range.clone())?;

// Default name doesn't have any configuration
let default_name: PathBuf = SnapshotSegment::Receipts.filename(&range).into();
let new_name: PathBuf = SnapshotSegment::Receipts
.filename_with_configuration(filters, compression, &range)
.into();

std::fs::rename(default_name, new_name)?;
joshieDo marked this conversation as resolved.
Show resolved Hide resolved

Ok(())
}
Expand Down Expand Up @@ -62,11 +74,10 @@ impl Command {

let mut row_indexes = tx_range.clone().collect::<Vec<_>>();

let path = SnapshotSegment::Receipts.filename_with_configuration(
filters,
compression,
&block_range,
);
let path = SnapshotSegment::Receipts
.filename_with_configuration(filters, compression, &block_range)
.into();

let provider = SnapshotProvider::default();
let jar_provider =
provider.get_segment_provider(SnapshotSegment::Receipts, self.from, Some(path))?;
Expand Down
40 changes: 25 additions & 15 deletions bin/reth/src/db/snapshots/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use reth_provider::{
TransactionsProvider, TransactionsProviderExt,
};
use reth_snapshot::{segments, segments::Segment};
use std::{path::Path, sync::Arc};
use std::{
path::{Path, PathBuf},
sync::Arc,
};

impl Command {
pub(crate) fn generate_transactions_snapshot<DB: Database>(
Expand All @@ -24,15 +27,24 @@ impl Command {
inclusion_filter: InclusionFilter,
phf: PerfectHashingFunction,
) -> eyre::Result<()> {
let segment = segments::Transactions::new(
compression,
if self.with_filters {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
},
);
segment.snapshot::<DB>(provider, self.from..=(self.from + self.block_interval - 1))?;
let range = self.from..=(self.from + self.block_interval - 1);
let filters = if self.with_filters {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
};

let segment = segments::Transactions::new(compression, filters);

segment.snapshot::<DB>(provider, range.clone())?;

// Default name doesn't have any configuration
let default_name: PathBuf = SnapshotSegment::Transactions.filename(&range).into();
let new_name: PathBuf = SnapshotSegment::Transactions
.filename_with_configuration(filters, compression, &range)
.into();

std::fs::rename(default_name, new_name)?;

Ok(())
}
Expand Down Expand Up @@ -62,11 +74,9 @@ impl Command {

let mut row_indexes = tx_range.clone().collect::<Vec<_>>();

let path = SnapshotSegment::Transactions.filename_with_configuration(
filters,
compression,
&block_range,
);
let path = SnapshotSegment::Transactions
.filename_with_configuration(filters, compression, &block_range)
.into();
let provider = SnapshotProvider::default();
let jar_provider =
provider.get_segment_provider(SnapshotSegment::Transactions, self.from, Some(path))?;
Expand Down
5 changes: 5 additions & 0 deletions bin/reth/src/dirs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,11 @@ impl<D> ChainPath<D> {
self.0.join("db").into()
}

/// Returns the path to the snapshots directory for this chain.
pub fn snapshots_path(&self) -> PathBuf {
self.0.join("snapshots").into()
}

/// Returns the path to the reth p2p secret key for this chain.
///
/// `<DIR>/<CHAIN_ID>/discovery-secret`
Expand Down
26 changes: 15 additions & 11 deletions bin/reth/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,17 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
// fetch the head block from the database
let head = self.lookup_head(Arc::clone(&db)).wrap_err("the head block is missing")?;

// configure snapshotter
let snapshotter = reth_snapshot::Snapshotter::new(
db.clone(),
data_dir.snapshots_path(),
self.chain.clone(),
self.chain.snapshot_block_interval,
)?;

// setup the blockchain provider
let factory = ProviderFactory::new(Arc::clone(&db), Arc::clone(&self.chain));
let factory = ProviderFactory::new(Arc::clone(&db), Arc::clone(&self.chain))
.with_snapshots(data_dir.snapshots_path(), snapshotter.highest_snapshot_receiver());
let blockchain_db = BlockchainProvider::new(factory, blockchain_tree.clone())?;
let blob_store = InMemoryBlobStore::default();
let validator = TransactionValidationTaskExecutor::eth_builder(Arc::clone(&self.chain))
Expand Down Expand Up @@ -454,12 +463,14 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
None
};

let (highest_snapshots_tx, highest_snapshots_rx) = watch::channel(None);

let mut hooks = EngineHooks::new();

let pruner_events = if let Some(prune_config) = prune_config {
let mut pruner = self.build_pruner(&prune_config, db.clone(), highest_snapshots_rx);
let mut pruner = self.build_pruner(
&prune_config,
db.clone(),
snapshotter.highest_snapshot_receiver(),
);

let events = pruner.events();
hooks.add(PruneHook::new(pruner, Box::new(ctx.task_executor.clone())));
Expand All @@ -470,13 +481,6 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
Either::Right(stream::empty())
};

let _snapshotter = reth_snapshot::Snapshotter::new(
db,
self.chain.clone(),
self.chain.snapshot_block_interval,
highest_snapshots_tx,
);

// Configure the consensus engine
let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel(
client,
Expand Down
8 changes: 7 additions & 1 deletion crates/primitives/src/snapshot/compression.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
#[derive(Debug, Copy, Clone, Default)]
use strum::AsRefStr;

#[derive(Debug, Copy, Clone, Default, AsRefStr)]
#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
#[allow(missing_docs)]
/// Snapshot compression
pub enum Compression {
#[strum(serialize = "lz4")]
Lz4,
#[strum(serialize = "zstd")]
Zstd,
#[strum(serialize = "zstd-dict")]
ZstdWithDictionary,
#[strum(serialize = "uncompressed")]
#[default]
Uncompressed,
}
9 changes: 7 additions & 2 deletions crates/primitives/src/snapshot/filters.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use strum::AsRefStr;

#[derive(Debug, Copy, Clone)]
/// Snapshot filters.
pub enum Filters {
Expand All @@ -14,20 +16,23 @@ impl Filters {
}
}

#[derive(Debug, Copy, Clone)]
#[derive(Debug, Copy, Clone, AsRefStr)]
#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
/// Snapshot inclusion filter. Also see [Filters].
pub enum InclusionFilter {
#[strum(serialize = "cuckoo")]
/// Cuckoo filter
Cuckoo,
}

#[derive(Debug, Copy, Clone)]
#[derive(Debug, Copy, Clone, AsRefStr)]
#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
/// Snapshot perfect hashing function. Also see [Filters].
pub enum PerfectHashingFunction {
#[strum(serialize = "fmph")]
/// Fingerprint-Based Minimal Perfect Hash Function
Fmph,
#[strum(serialize = "gofmph")]
/// Fingerprint-Based Minimal Perfect Hash Function with Group Optimization
GoFmph,
}
26 changes: 26 additions & 0 deletions crates/primitives/src/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,35 @@ mod compression;
mod filters;
mod segment;

use alloy_primitives::BlockNumber;
pub use compression::Compression;
pub use filters::{Filters, InclusionFilter, PerfectHashingFunction};
pub use segment::{SegmentHeader, SnapshotSegment};

/// Default snapshot block count.
pub const BLOCKS_PER_SNAPSHOT: u64 = 500_000;

/// Highest snapshotted block numbers, per data part.
#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
pub struct HighestSnapshots {
/// Highest snapshotted block of headers, inclusive.
/// If [`None`], no snapshot is available.
pub headers: Option<BlockNumber>,
/// Highest snapshotted block of receipts, inclusive.
/// If [`None`], no snapshot is available.
pub receipts: Option<BlockNumber>,
/// Highest snapshotted block of transactions, inclusive.
/// If [`None`], no snapshot is available.
pub transactions: Option<BlockNumber>,
}

impl HighestSnapshots {
/// Returns the highest snapshot if it exists for a segment
pub fn highest(&self, segment: SnapshotSegment) -> Option<BlockNumber> {
match segment {
SnapshotSegment::Headers => self.headers,
SnapshotSegment::Transactions => self.transactions,
SnapshotSegment::Receipts => self.receipts,
}
}
}
Loading
Loading