Skip to content

Commit

Permalink
chore: remove unused static-file code (#9178)
Browse files Browse the repository at this point in the history
  • Loading branch information
joshieDo authored Jul 1, 2024
1 parent cf8a916 commit 9d4722e
Show file tree
Hide file tree
Showing 11 changed files with 96 additions and 499 deletions.
77 changes: 4 additions & 73 deletions crates/static-file/static-file/src/segments/headers.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::segments::{dataset_for_compression, prepare_jar, Segment, SegmentHeader};
use crate::segments::Segment;
use alloy_primitives::BlockNumber;
use reth_db::{static_file::create_static_file_T1_T2_T3, tables, RawKey, RawTable};
use reth_db::tables;
use reth_db_api::{cursor::DbCursorRO, database::Database, transaction::DbTx};
use reth_provider::{
providers::{StaticFileProvider, StaticFileWriter},
DatabaseProviderRO,
};
use reth_static_file_types::{SegmentConfig, StaticFileSegment};
use reth_static_file_types::StaticFileSegment;
use reth_storage_errors::provider::ProviderResult;
use std::{ops::RangeInclusive, path::Path};
use std::ops::RangeInclusive;

/// Static File segment responsible for [`StaticFileSegment::Headers`] part of data.
#[derive(Debug, Default)]
Expand Down Expand Up @@ -56,73 +56,4 @@ impl<DB: Database> Segment<DB> for Headers {

Ok(())
}

fn create_static_file_file(
&self,
provider: &DatabaseProviderRO<DB>,
directory: &Path,
config: SegmentConfig,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let range_len = block_range.clone().count();
let jar = prepare_jar::<DB, 3>(
provider,
directory,
StaticFileSegment::Headers,
config,
block_range.clone(),
range_len,
|| {
Ok([
dataset_for_compression::<DB, tables::Headers>(
provider,
&block_range,
range_len,
)?,
dataset_for_compression::<DB, tables::HeaderTerminalDifficulties>(
provider,
&block_range,
range_len,
)?,
dataset_for_compression::<DB, tables::CanonicalHeaders>(
provider,
&block_range,
range_len,
)?,
])
},
)?;

// Generate list of hashes for filters & PHF
let mut cursor = provider.tx_ref().cursor_read::<RawTable<tables::CanonicalHeaders>>()?;
let hashes = if config.filters.has_filters() {
Some(
cursor
.walk(Some(RawKey::from(*block_range.start())))?
.take(range_len)
.map(|row| row.map(|(_key, value)| value.into_value()).map_err(|e| e.into())),
)
} else {
None
};

create_static_file_T1_T2_T3::<
tables::Headers,
tables::HeaderTerminalDifficulties,
tables::CanonicalHeaders,
BlockNumber,
SegmentHeader,
>(
provider.tx_ref(),
block_range,
None,
// We already prepared the dictionary beforehand
None::<Vec<std::vec::IntoIter<Vec<u8>>>>,
hashes,
range_len,
jar,
)?;

Ok(())
}
}
93 changes: 4 additions & 89 deletions crates/static-file/static-file/src/segments/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,11 @@ mod receipts;
pub use receipts::Receipts;

use alloy_primitives::BlockNumber;
use reth_db::{RawKey, RawTable};
use reth_db_api::{cursor::DbCursorRO, database::Database, table::Table, transaction::DbTx};
use reth_nippy_jar::NippyJar;
use reth_provider::{
providers::StaticFileProvider, DatabaseProviderRO, ProviderError, TransactionsProviderExt,
};
use reth_static_file_types::{
find_fixed_range, Compression, Filters, InclusionFilter, PerfectHashingFunction, SegmentConfig,
SegmentHeader, StaticFileSegment,
};
use reth_db_api::database::Database;
use reth_provider::{providers::StaticFileProvider, DatabaseProviderRO};
use reth_static_file_types::StaticFileSegment;
use reth_storage_errors::provider::ProviderResult;
use std::{ops::RangeInclusive, path::Path};

pub(crate) type Rows<const COLUMNS: usize> = [Vec<Vec<u8>>; COLUMNS];
use std::ops::RangeInclusive;

/// A segment represents moving some portion of the data to static files.
pub trait Segment<DB: Database>: Send + Sync {
Expand All @@ -38,80 +29,4 @@ pub trait Segment<DB: Database>: Send + Sync {
static_file_provider: StaticFileProvider,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()>;

/// Create a static file of data for the provided block range. The `directory` parameter
/// determines the static file's save location.
fn create_static_file_file(
&self,
provider: &DatabaseProviderRO<DB>,
directory: &Path,
config: SegmentConfig,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()>;
}

/// Returns a [`NippyJar`] according to the desired configuration. The `directory` parameter
/// determines the static file's save location.
pub(crate) fn prepare_jar<DB: Database, const COLUMNS: usize>(
provider: &DatabaseProviderRO<DB>,
directory: impl AsRef<Path>,
segment: StaticFileSegment,
segment_config: SegmentConfig,
block_range: RangeInclusive<BlockNumber>,
total_rows: usize,
prepare_compression: impl Fn() -> ProviderResult<Rows<COLUMNS>>,
) -> ProviderResult<NippyJar<SegmentHeader>> {
let tx_range = match segment {
StaticFileSegment::Headers => None,
StaticFileSegment::Receipts | StaticFileSegment::Transactions => {
Some(provider.transaction_range_by_block_range(block_range.clone())?.into())
}
};

let mut nippy_jar = NippyJar::new(
COLUMNS,
&directory.as_ref().join(segment.filename(&find_fixed_range(*block_range.end())).as_str()),
SegmentHeader::new(block_range.clone().into(), Some(block_range.into()), tx_range, segment),
);

nippy_jar = match segment_config.compression {
Compression::Lz4 => nippy_jar.with_lz4(),
Compression::Zstd => nippy_jar.with_zstd(false, 0),
Compression::ZstdWithDictionary => {
let dataset = prepare_compression()?;

nippy_jar = nippy_jar.with_zstd(true, 5_000_000);
nippy_jar
.prepare_compression(dataset.to_vec())
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;
nippy_jar
}
Compression::Uncompressed => nippy_jar,
};

if let Filters::WithFilters(inclusion_filter, phf) = segment_config.filters {
nippy_jar = match inclusion_filter {
InclusionFilter::Cuckoo => nippy_jar.with_cuckoo_filter(total_rows),
};
nippy_jar = match phf {
PerfectHashingFunction::Fmph => nippy_jar.with_fmph(),
PerfectHashingFunction::GoFmph => nippy_jar.with_gofmph(),
};
}

Ok(nippy_jar)
}

/// Generates the dataset to train a zstd dictionary with the most recent rows (at most 1000).
pub(crate) fn dataset_for_compression<DB: Database, T: Table<Key = u64>>(
provider: &DatabaseProviderRO<DB>,
range: &RangeInclusive<u64>,
range_len: usize,
) -> ProviderResult<Vec<Vec<u8>>> {
let mut cursor = provider.tx_ref().cursor_read::<RawTable<T>>()?;
Ok(cursor
.walk_back(Some(RawKey::from(*range.end())))?
.take(range_len.min(1000))
.map(|row| row.map(|(_key, value)| value.into_value()).expect("should exist"))
.collect::<Vec<_>>())
}
64 changes: 6 additions & 58 deletions crates/static-file/static-file/src/segments/receipts.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::segments::{dataset_for_compression, prepare_jar, Segment};
use alloy_primitives::{BlockNumber, TxNumber};
use reth_db::{static_file::create_static_file_T1, tables};
use crate::segments::Segment;
use alloy_primitives::BlockNumber;
use reth_db::tables;
use reth_db_api::{cursor::DbCursorRO, database::Database, transaction::DbTx};
use reth_provider::{
providers::{StaticFileProvider, StaticFileWriter},
BlockReader, DatabaseProviderRO, TransactionsProviderExt,
BlockReader, DatabaseProviderRO,
};
use reth_static_file_types::{SegmentConfig, SegmentHeader, StaticFileSegment};
use reth_static_file_types::StaticFileSegment;
use reth_storage_errors::provider::{ProviderError, ProviderResult};
use std::{ops::RangeInclusive, path::Path};
use std::ops::RangeInclusive;

/// Static File segment responsible for [`StaticFileSegment::Receipts`] part of data.
#[derive(Debug, Default)]
Expand Down Expand Up @@ -47,56 +47,4 @@ impl<DB: Database> Segment<DB> for Receipts {

Ok(())
}

fn create_static_file_file(
&self,
provider: &DatabaseProviderRO<DB>,
directory: &Path,
config: SegmentConfig,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let tx_range = provider.transaction_range_by_block_range(block_range.clone())?;
let tx_range_len = tx_range.clone().count();

let jar = prepare_jar::<DB, 1>(
provider,
directory,
StaticFileSegment::Receipts,
config,
block_range,
tx_range_len,
|| {
Ok([dataset_for_compression::<DB, tables::Receipts>(
provider,
&tx_range,
tx_range_len,
)?])
},
)?;

// Generate list of hashes for filters & PHF
let hashes = if config.filters.has_filters() {
Some(
provider
.transaction_hashes_by_range(*tx_range.start()..(*tx_range.end() + 1))?
.into_iter()
.map(|(tx, _)| Ok(tx)),
)
} else {
None
};

create_static_file_T1::<tables::Receipts, TxNumber, SegmentHeader>(
provider.tx_ref(),
tx_range,
None,
// We already prepared the dictionary beforehand
None::<Vec<std::vec::IntoIter<Vec<u8>>>>,
hashes,
tx_range_len,
jar,
)?;

Ok(())
}
}
64 changes: 6 additions & 58 deletions crates/static-file/static-file/src/segments/transactions.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::segments::{dataset_for_compression, prepare_jar, Segment};
use alloy_primitives::{BlockNumber, TxNumber};
use reth_db::{static_file::create_static_file_T1, tables};
use crate::segments::Segment;
use alloy_primitives::BlockNumber;
use reth_db::tables;
use reth_db_api::{cursor::DbCursorRO, database::Database, transaction::DbTx};
use reth_provider::{
providers::{StaticFileProvider, StaticFileWriter},
BlockReader, DatabaseProviderRO, TransactionsProviderExt,
BlockReader, DatabaseProviderRO,
};
use reth_static_file_types::{SegmentConfig, SegmentHeader, StaticFileSegment};
use reth_static_file_types::StaticFileSegment;
use reth_storage_errors::provider::{ProviderError, ProviderResult};
use std::{ops::RangeInclusive, path::Path};
use std::ops::RangeInclusive;

/// Static File segment responsible for [`StaticFileSegment::Transactions`] part of data.
#[derive(Debug, Default)]
Expand Down Expand Up @@ -53,56 +53,4 @@ impl<DB: Database> Segment<DB> for Transactions {

Ok(())
}

fn create_static_file_file(
&self,
provider: &DatabaseProviderRO<DB>,
directory: &Path,
config: SegmentConfig,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let tx_range = provider.transaction_range_by_block_range(block_range.clone())?;
let tx_range_len = tx_range.clone().count();

let jar = prepare_jar::<DB, 1>(
provider,
directory,
StaticFileSegment::Transactions,
config,
block_range,
tx_range_len,
|| {
Ok([dataset_for_compression::<DB, tables::Transactions>(
provider,
&tx_range,
tx_range_len,
)?])
},
)?;

// Generate list of hashes for filters & PHF
let hashes = if config.filters.has_filters() {
Some(
provider
.transaction_hashes_by_range(*tx_range.start()..(*tx_range.end() + 1))?
.into_iter()
.map(|(tx, _)| Ok(tx)),
)
} else {
None
};

create_static_file_T1::<tables::Transactions, TxNumber, SegmentHeader>(
provider.tx_ref(),
tx_range,
None,
// We already prepared the dictionary beforehand
None::<Vec<std::vec::IntoIter<Vec<u8>>>>,
hashes,
tx_range_len,
jar,
)?;

Ok(())
}
}
Loading

0 comments on commit 9d4722e

Please sign in to comment.