diff --git a/src/common.rs b/src/common.rs index dec1023..50189a1 100644 --- a/src/common.rs +++ b/src/common.rs @@ -1 +1,3 @@ pub mod db; +pub mod lmdb_utils; +pub mod progress; diff --git a/src/common/db.rs b/src/common/db.rs index 89e7aed..098fe72 100644 --- a/src/common/db.rs +++ b/src/common/db.rs @@ -39,6 +39,7 @@ use thiserror::Error; use casper_types::bytesrepr::Error as BytesreprError; +pub const STORAGE_FILE_NAME: &str = "storage.lmdb"; const ENTRY_LOG_INTERVAL: usize = 100_000; const MAX_DB_READERS: u32 = 100; diff --git a/src/common/db/tests.rs b/src/common/db/tests.rs index 7f61ebb..512179d 100644 --- a/src/common/db/tests.rs +++ b/src/common/db/tests.rs @@ -17,9 +17,9 @@ fn gen_faulty_bytes(rng: &mut ThreadRng) -> Vec { fn populate_db(env: &Environment, db: &LmdbDatabase) { let mut rng = rand::thread_rng(); - let entries_count = rng.gen_range(10u32..100u32); + let entry_count = rng.gen_range(10u32..100u32); let mut rw_tx = env.begin_rw_txn().expect("couldn't begin rw transaction"); - for i in 0..entries_count { + for i in 0..entry_count { let bytes = gen_bytes(&mut rng); let key: [u8; 4] = i.to_le_bytes(); rw_tx.put(*db, &key, &bytes, WriteFlags::empty()).unwrap(); @@ -29,9 +29,9 @@ fn populate_db(env: &Environment, db: &LmdbDatabase) { fn populate_faulty_db(env: &Environment, db: &LmdbDatabase) { let mut rng = rand::thread_rng(); - let entries_count = rng.gen_range(10u32..100u32); + let entry_count = rng.gen_range(10u32..100u32); let mut rw_tx = env.begin_rw_txn().expect("couldn't begin rw transaction"); - for i in 0..entries_count { + for i in 0..entry_count { let bytes = if i % 5 == 0 { gen_faulty_bytes(&mut rng) } else { @@ -141,7 +141,7 @@ fn sanity_check_ser_deser() { #[test] fn good_db_should_pass_check() { - let fixture = LmdbTestFixture::new(Some(MockDb::db_name())); + let fixture = LmdbTestFixture::new(Some(MockDb::db_name()), None); populate_db(&fixture.env, &fixture.db); assert!(MockDb::check_db(&fixture.env, true, 0).is_ok()); @@ -152,7 +152,7 @@ fn good_db_should_pass_check() { #[test] fn bad_db_should_fail_check() { - let fixture = LmdbTestFixture::new(Some(MockDb::db_name())); + let fixture = LmdbTestFixture::new(Some(MockDb::db_name()), None); populate_faulty_db(&fixture.env, &fixture.db); assert!(MockDb::check_db(&fixture.env, true, 0).is_err()); diff --git a/src/common/lmdb_utils.rs b/src/common/lmdb_utils.rs new file mode 100644 index 0000000..d663cd5 --- /dev/null +++ b/src/common/lmdb_utils.rs @@ -0,0 +1,90 @@ +use std::result::Result; + +use lmdb::{Database, Error, Transaction}; +use lmdb_sys::{mdb_stat, MDB_stat}; + +/// Retrieves the number of entries in a database. +pub fn entry_count(txn: &'_ T, database: Database) -> Result { + let mut stat = MDB_stat { + ms_psize: 0, + ms_depth: 0, + ms_branch_pages: 0, + ms_leaf_pages: 0, + ms_overflow_pages: 0, + ms_entries: 0, + }; + let result = unsafe { mdb_stat(txn.txn(), database.dbi(), &mut stat as *mut MDB_stat) }; + if result != 0 { + Err(Error::from_err_code(result)) + } else { + Ok(stat.ms_entries) + } +} + +#[cfg(test)] +mod tests { + use lmdb::{Transaction, WriteFlags}; + + use crate::test_utils::LmdbTestFixture; + + use super::entry_count; + + #[test] + fn db_entry_count() { + let fixture = LmdbTestFixture::new(None, None); + let env = &fixture.env; + let db = fixture.db; + + if let Ok(txn) = env.begin_ro_txn() { + assert_eq!(entry_count(&txn, db).unwrap(), 0); + txn.commit().unwrap(); + } + + let first_dummy_input = [0u8, 1u8]; + let second_dummy_input = [1u8, 2u8]; + // Insert the first entry into the database. + if let Ok(mut txn) = env.begin_rw_txn() { + txn.put( + fixture.db, + &first_dummy_input, + &first_dummy_input, + WriteFlags::empty(), + ) + .unwrap(); + txn.commit().unwrap(); + }; + + if let Ok(txn) = env.begin_ro_txn() { + assert_eq!(entry_count(&txn, db).unwrap(), 1); + txn.commit().unwrap(); + } + + // Insert the second entry into the database. + if let Ok(mut txn) = env.begin_rw_txn() { + txn.put( + fixture.db, + &second_dummy_input, + &second_dummy_input, + WriteFlags::empty(), + ) + .unwrap(); + txn.commit().unwrap(); + }; + + if let Ok(txn) = env.begin_ro_txn() { + assert_eq!(entry_count(&txn, db).unwrap(), 2); + txn.commit().unwrap(); + }; + + // Delete the first entry from the database. + if let Ok(mut txn) = env.begin_rw_txn() { + txn.del(fixture.db, &first_dummy_input, None).unwrap(); + txn.commit().unwrap(); + }; + + if let Ok(txn) = env.begin_ro_txn() { + assert_eq!(entry_count(&txn, db).unwrap(), 1); + txn.commit().unwrap(); + }; + } +} diff --git a/src/common/progress.rs b/src/common/progress.rs new file mode 100644 index 0000000..deae7a1 --- /dev/null +++ b/src/common/progress.rs @@ -0,0 +1,71 @@ +use std::result::Result; + +use log::warn; + +// The tracker will log 20 times during its operation. +const STEPS: usize = 20; +// The tracker will log progress every in 5% completion intervals. +const PROGRESS_MULTIPLIER: u64 = 100 / STEPS as u64; +// Error message for initialization of a progress tracker with nothing +// to process. +const NULL_TOTAL_TO_PROCESS_ERROR: &str = "Cannot initialize total to process with 0"; + +/// Tracks and logs progress of an operation in a human readable form. +/// Whenever (1 / number of steps) of the total amount has been processed, +/// this structure calls the `log_progress` function, which takes the +/// percentage completed so far as a parameter. +pub struct ProgressTracker { + /// Total amount there is to process. + total_to_process: usize, + /// Amount processed so far. + processed: usize, + /// Internal counter to keep track of the number of steps completed + /// so far relative to the maximum amount of steps this operation + /// will do, defined in `STEPS`. + progress_factor: u64, + /// Function which takes the completion rate as a percentage as + /// input. It is called zero or more times as progress is being made + /// using `ProgressTracker::advance_by`. The purpose of this function + /// is to allow users to create custom log messages for their specific + /// operation. + log_progress: Box, +} + +impl ProgressTracker { + /// Create a new progress tracker by initializing it with a non-zero + /// amount to be processed and a log function. + pub fn new( + total_to_process: usize, + log_progress: Box, + ) -> Result { + if total_to_process == 0 { + Err(NULL_TOTAL_TO_PROCESS_ERROR) + } else { + Ok(Self { + total_to_process, + processed: 0, + progress_factor: 1, + log_progress, + }) + } + } + + /// Advance the progress tracker by a specific amount. If it passes + /// a milestone ((1 / STEP) of the total amount to process), + /// `log_progress` will be called with the current completion rate + /// as input. + pub fn advance_by(&mut self, step: usize) { + self.processed += step; + while self.processed >= (self.total_to_process * self.progress_factor as usize) / STEPS { + (*self.log_progress)(self.progress_factor * PROGRESS_MULTIPLIER); + self.progress_factor += 1; + } + if self.processed > self.total_to_process { + warn!( + "Exceeded total amount to process {} by {}", + self.total_to_process, + self.processed - self.total_to_process + ); + } + } +} diff --git a/src/main.rs b/src/main.rs index 2c35c79..3b62082 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,7 +9,7 @@ use std::{fs::OpenOptions, process}; use clap::{crate_description, crate_version, Arg, Command}; use log::error; -use subcommands::{archive, check, latest_block_summary, trie_compact, unsparse}; +use subcommands::{archive, check, latest_block_summary, trie_compact, unsparse, Error}; const LOGGING: &str = "logging"; @@ -61,20 +61,26 @@ fn main() { ); let (subcommand_name, matches) = arg_matches.subcommand().unwrap_or_else(|| { - error!("{}", cli().get_long_about().unwrap()); + error!( + "{}", + cli().get_long_about().expect("should have long about") + ); process::exit(1); }); - let succeeded = match subcommand_name { - archive::COMMAND_NAME => archive::run(matches), - check::COMMAND_NAME => check::run(matches), - latest_block_summary::COMMAND_NAME => latest_block_summary::run(matches), - trie_compact::COMMAND_NAME => trie_compact::run(matches), - unsparse::COMMAND_NAME => unsparse::run(matches), + let result: Result<(), Error> = match subcommand_name { + archive::COMMAND_NAME => archive::run(matches).map_err(Error::from), + check::COMMAND_NAME => check::run(matches).map_err(Error::from), + latest_block_summary::COMMAND_NAME => { + latest_block_summary::run(matches).map_err(Error::from) + } + trie_compact::COMMAND_NAME => trie_compact::run(matches).map_err(Error::from), + unsparse::COMMAND_NAME => unsparse::run(matches).map_err(Error::from), _ => unreachable!("{} should be handled above", subcommand_name), }; - if !succeeded { + if let Err(run_err) = result { + error!("{}", run_err); process::exit(1); } } diff --git a/src/subcommands.rs b/src/subcommands.rs index 83356a8..3c29bf7 100644 --- a/src/subcommands.rs +++ b/src/subcommands.rs @@ -3,3 +3,27 @@ pub mod check; pub mod latest_block_summary; pub mod trie_compact; pub mod unsparse; + +use thiserror::Error as ThisError; + +use archive::{CreateError, UnpackError}; +use check::Error as CheckError; +use latest_block_summary::Error as LatestBlockSummaryError; +use trie_compact::Error as TrieCompactError; +use unsparse::Error as UnsparseError; + +#[derive(ThisError, Debug)] +pub enum Error { + #[error("Archive create failed: {0}")] + ArchiveCreate(#[from] CreateError), + #[error("Archive unpack failed: {0}")] + ArchiveUnpack(#[from] UnpackError), + #[error("Check command failed: {0}")] + Check(#[from] CheckError), + #[error("Latest block summary command failed: {0}")] + LatestBlockSummary(#[from] LatestBlockSummaryError), + #[error("Trie compact failed: {0}")] + TrieCompact(#[from] TrieCompactError), + #[error("Unsparse failed: {0}")] + Unsparse(#[from] UnsparseError), +} diff --git a/src/subcommands/archive.rs b/src/subcommands/archive.rs index d141058..fcebc83 100644 --- a/src/subcommands/archive.rs +++ b/src/subcommands/archive.rs @@ -1,6 +1,12 @@ use std::process; use clap::{ArgMatches, Command}; +use thiserror::Error as ThisError; + +pub use create::Error as CreateError; +pub use unpack::Error as UnpackError; + +use super::Error as SubcommandError; mod create; mod ring_buffer; @@ -15,6 +21,23 @@ enum DisplayOrder { Unpack, } +#[derive(ThisError, Debug)] +pub enum Error { + #[error("create: {0}")] + Create(#[from] CreateError), + #[error("unpack: {0}")] + Unpack(#[from] UnpackError), +} + +impl From for SubcommandError { + fn from(err: Error) -> Self { + match err { + Error::Create(create_err) => SubcommandError::ArchiveCreate(create_err), + Error::Unpack(unpack_err) => SubcommandError::ArchiveUnpack(unpack_err), + } + } +} + pub fn command(display_order: usize) -> Command<'static> { Command::new(COMMAND_NAME) .display_order(display_order) @@ -23,14 +46,14 @@ pub fn command(display_order: usize) -> Command<'static> { .subcommand(unpack::command(DisplayOrder::Unpack as usize)) } -pub fn run(matches: &ArgMatches) -> bool { +pub fn run(matches: &ArgMatches) -> Result<(), Error> { let (subcommand_name, matches) = matches.subcommand().unwrap_or_else(|| { process::exit(1); }); match subcommand_name { - create::COMMAND_NAME => create::run(matches), - unpack::COMMAND_NAME => unpack::run(matches), + create::COMMAND_NAME => create::run(matches).map_err(Error::Create), + unpack::COMMAND_NAME => unpack::run(matches).map_err(Error::Unpack), _ => unreachable!("{} should be handled above", subcommand_name), } } diff --git a/src/subcommands/archive/create.rs b/src/subcommands/archive/create.rs index 0626b80..41cd2c5 100644 --- a/src/subcommands/archive/create.rs +++ b/src/subcommands/archive/create.rs @@ -11,6 +11,7 @@ use thiserror::Error as ThisError; use super::zstd_utils::Error as ZstdError; pub const COMMAND_NAME: &str = "create"; +const OVERWRITE: &str = "overwrite"; const OUTPUT: &str = "output"; const DB: &str = "db-dir"; @@ -29,6 +30,7 @@ pub enum Error { enum DisplayOrder { Db, Output, + Overwrite, } pub fn command(display_order: usize) -> Command<'static> { @@ -57,16 +59,23 @@ pub fn command(display_order: usize) -> Command<'static> { .value_name("FILE_PATH") .help("Output file path for the compressed tar archive."), ) + .arg( + Arg::new(OVERWRITE) + .display_order(DisplayOrder::Overwrite as usize) + .required(false) + .short('w') + .long(OVERWRITE) + .takes_value(false) + .help( + "Overwrite an already existing archive file in destination \ + directory.", + ), + ) } -pub fn run(matches: &ArgMatches) -> bool { +pub fn run(matches: &ArgMatches) -> Result<(), Error> { let db_path = matches.value_of(DB).unwrap(); let dest = matches.value_of(OUTPUT).unwrap(); - let result = pack::create_archive(db_path, dest); - - if let Err(error) = &result { - error!("Archive packing failed. {}", error); - } - - result.is_ok() + let overwrite = matches.is_present(OVERWRITE); + pack::create_archive(db_path, dest, overwrite) } diff --git a/src/subcommands/archive/create/pack.rs b/src/subcommands/archive/create/pack.rs index edfb912..e26c15d 100644 --- a/src/subcommands/archive/create/pack.rs +++ b/src/subcommands/archive/create/pack.rs @@ -16,6 +16,7 @@ const BUFFER_CAPACITY: usize = 1_000; pub fn create_archive, P2: AsRef>( db_dir_path: P1, dest: P2, + overwrite: bool, ) -> Result<(), Error> { let ring_buffer = BlockingRingBuffer::new(BUFFER_CAPACITY); let (producer, mut consumer) = ring_buffer.split(); @@ -23,17 +24,18 @@ pub fn create_archive, P2: AsRef>( let db_dir_path_copy = db_dir_path.as_ref().to_path_buf(); let handle = thread::spawn(move || { let mut archive_stream = - ArchiveStream::new(&db_dir_path_copy, producer).unwrap_or_else(|_| { + ArchiveStream::new(&db_dir_path_copy, producer).unwrap_or_else(|io_err| { panic!( - "Couldn't read files from {}", - db_dir_path_copy.to_string_lossy() + "Couldn't read files from {}: {}", + db_dir_path_copy.to_string_lossy(), + io_err ) }); archive_stream.pack().expect("Couldn't archive files"); }); let output_file = OpenOptions::new() - .create_new(true) + .create_new(!overwrite) .write(true) .open(&dest) .map_err(Error::Destination)?; diff --git a/src/subcommands/archive/create/tests.rs b/src/subcommands/archive/create/tests.rs index 34b6dc5..1d9c485 100644 --- a/src/subcommands/archive/create/tests.rs +++ b/src/subcommands/archive/create/tests.rs @@ -50,7 +50,32 @@ fn archive_create_roundtrip() { let out_dir = tempfile::tempdir().unwrap(); let archive_path = dst_dir.path().join("test_archive.tar.zst"); // Create the compressed archive. - assert!(pack::create_archive(&src_dir, &archive_path).is_ok()); + assert!(pack::create_archive(&src_dir, &archive_path, false).is_ok()); + // Unpack and then delete the archive. + unpack_mock_archive(&archive_path, &out_dir); + for idx in 0..NUM_TEST_FILES { + let contents = fs::read(out_dir.path().join(&format!("file_{}", idx))).unwrap(); + if contents != test_payloads.payloads[idx] { + panic!("Contents of file {} are different from the original", idx); + } + } +} + +#[test] +fn archive_create_overwrite() { + // Create the mock test directory with randomly-filled files. + let src_dir = &(*MOCK_DIR).0; + let test_payloads = &(*MOCK_DIR).1; + let dst_dir = tempfile::tempdir().unwrap(); + let out_dir = tempfile::tempdir().unwrap(); + let archive_path = dst_dir.path().join("test_archive.tar.zst"); + // Create a mock existing file at the expected destination. + fs::write(&archive_path, "dummy input").unwrap(); + // File already exists, so creating the archive without the overwrite flag + // should fail. + assert!(pack::create_archive(&src_dir, &archive_path, false).is_err()); + // Create the compressed archive with the overwrite set. + assert!(pack::create_archive(&src_dir, &archive_path, true).is_ok()); // Unpack and then delete the archive. unpack_mock_archive(&archive_path, &out_dir); for idx in 0..NUM_TEST_FILES { @@ -68,17 +93,23 @@ fn archive_create_bad_input() { let inexistent_file_path = root_dst.path().join("bogus_path"); // Source doesn't exist. - assert!(pack::create_archive(&inexistent_file_path, &inexistent_file_path).is_err()); + assert!(pack::create_archive(&inexistent_file_path, &inexistent_file_path, false).is_err()); // Source is not a directory. let file = NamedTempFile::new().unwrap(); - assert!(pack::create_archive(file.path(), &inexistent_file_path).is_err()); + assert!(pack::create_archive(file.path(), &inexistent_file_path, false).is_err()); // Destination directory doesn't exist. let root_dst = tempfile::tempdir().unwrap(); assert!(pack::create_archive( &src_dir, - root_dst.path().join("bogus_dest/test_archive.tar.zst") + root_dst.path().join("bogus_dest/test_archive.tar.zst"), + false, ) .is_err()); + + // Destination directory isn't empty. + let root_dst = tempfile::tempdir().unwrap(); + let existing_file = NamedTempFile::new_in(&root_dst).unwrap(); + assert!(pack::create_archive(&src_dir, existing_file.path(), false).is_err()); } diff --git a/src/subcommands/archive/unpack.rs b/src/subcommands/archive/unpack.rs index a1c2007..3d6ea7a 100644 --- a/src/subcommands/archive/unpack.rs +++ b/src/subcommands/archive/unpack.rs @@ -53,12 +53,23 @@ fn validate_destination_path>(path: P) -> Result<(), Error> { let path_ref = path.as_ref(); if path_ref.exists() { if path_ref.is_dir() { + if path_ref + .read_dir() + .map_err(Error::Destination)? + .any(|entry| entry.is_ok()) + { + Err(Error::Destination(IoError::new( + ErrorKind::InvalidInput, + "not an empty directory", + ))) + } else { + Ok(()) + } + } else { Err(Error::Destination(IoError::new( ErrorKind::InvalidInput, "not a directory", ))) - } else { - Ok(()) } } else { fs::create_dir_all(path_ref).map_err(Error::Destination) @@ -117,7 +128,7 @@ pub fn command(display_order: usize) -> Command<'static> { ) } -pub fn run(matches: &ArgMatches) -> bool { +pub fn run(matches: &ArgMatches) -> Result<(), Error> { let input = matches .value_of(URL) .map(|url| Input::Url(url.to_string())) @@ -128,11 +139,5 @@ pub fn run(matches: &ArgMatches) -> bool { .unwrap_or_else(|| panic!("Should have one of {} or {}", FILE, URL)) }); let dest = matches.value_of(OUTPUT).unwrap(); - let result = unpack(input, dest); - - if let Err(error) = &result { - error!("Archive unpack failed. {}", error); - } - - result.is_ok() + unpack(input, dest) } diff --git a/src/subcommands/archive/unpack/download_stream.rs b/src/subcommands/archive/unpack/download_stream.rs index c10e6e9..6d70631 100644 --- a/src/subcommands/archive/unpack/download_stream.rs +++ b/src/subcommands/archive/unpack/download_stream.rs @@ -1,18 +1,23 @@ -use std::{io::Read, path::Path, result::Result}; +use std::{ + io::{Error as IoError, Read}, + path::Path, + result::Result, +}; use futures::{io, AsyncRead, AsyncReadExt, TryStreamExt}; -use log::info; +use log::{info, warn}; use tokio::runtime::{Builder as TokioRuntimeBuilder, Runtime}; use super::Error; -use crate::subcommands::archive::{tar_utils, zstd_utils}; +use crate::{ + common::progress::ProgressTracker, + subcommands::archive::{tar_utils, zstd_utils}, +}; -pub struct HttpStream { +struct HttpStream { runtime: Runtime, reader: Box, - pub stream_length: Option, - pub total_bytes_read: usize, - pub progress: u64, + maybe_progress_tracker: Option, } impl HttpStream { @@ -21,13 +26,9 @@ impl HttpStream { let response_fut = reqwest::get(url).await; match response_fut { Ok(response) => { - let maybe_len = response.content_length().map(|len| { + let maybe_len = response.content_length().and_then(|len| { info!("Download size: {} bytes.", len); - // Highly unlikely scenario where we can't convert `u64` to `usize`. - // This would mean we're running on a 32-bit or older system and the - // content length cannot be represented in that system's pointer size. - len.try_into() - .expect("Couldn't convert content length from u64 to usize") + len.try_into().ok() }); Ok(( response.bytes_stream().map_err(|reqwest_err| { @@ -42,26 +43,37 @@ impl HttpStream { let (http_stream, maybe_content_length) = runtime.block_on(response_future)?; let http_stream = http_stream.into_async_read(); let reader = Box::new(http_stream) as Box; + let mut maybe_progress_tracker = None; + match maybe_content_length { + Some(len) => match ProgressTracker::new( + len, + Box::new(|completion| info!("Download {}% complete...", completion)), + ) { + Ok(progress_tracker) => maybe_progress_tracker = Some(progress_tracker), + Err(progress_tracker_error) => { + warn!( + "Couldn't initialize progress tracker: {}", + progress_tracker_error + ) + } + }, + None => warn!("No stream length provided, progress will not be logged."), + } + Ok(Self { runtime, reader, - stream_length: maybe_content_length, - total_bytes_read: 0, - progress: 1, + maybe_progress_tracker, }) } } impl Read for HttpStream { - fn read(&mut self, buf: &mut [u8]) -> io::Result { + fn read(&mut self, buf: &mut [u8]) -> Result { let fut = async { self.reader.read(buf).await }; let bytes_read = self.runtime.block_on(fut)?; - self.total_bytes_read += bytes_read; - if let Some(stream_len) = self.stream_length { - while self.total_bytes_read > (stream_len * self.progress as usize) / 20 { - info!("Download {}% complete...", self.progress * 5); - self.progress += 1; - } + if let Some(progress_tracker) = self.maybe_progress_tracker.as_mut() { + progress_tracker.advance_by(bytes_read); } Ok(bytes_read) } @@ -77,6 +89,5 @@ pub fn download_and_unpack_archive>(url: &str, dest: P) -> Result let decoder = zstd_utils::zstd_decode_stream(http_stream)?; let mut unpacker = tar_utils::unarchive_stream(decoder); unpacker.unpack(&dest).map_err(Error::Streaming)?; - info!("Download complete."); Ok(()) } diff --git a/src/subcommands/archive/unpack/file_stream.rs b/src/subcommands/archive/unpack/file_stream.rs index 8a74336..515f9a7 100644 --- a/src/subcommands/archive/unpack/file_stream.rs +++ b/src/subcommands/archive/unpack/file_stream.rs @@ -1,9 +1,63 @@ -use std::{fs::OpenOptions, path::Path, result::Result}; +use std::{ + fs::OpenOptions, + io::{Error as IoError, Read}, + path::Path, + result::Result, +}; -use log::info; +use log::{info, warn}; use super::Error; -use crate::subcommands::archive::{tar_utils, zstd_utils}; +use crate::{ + common::progress::ProgressTracker, + subcommands::archive::{tar_utils, zstd_utils}, +}; + +struct FileStream { + reader: R, + maybe_progress_tracker: Option, +} + +impl FileStream { + fn new(reader: R, maybe_len: Option) -> Self { + let mut maybe_progress_tracker = None; + match maybe_len { + Some(len) => match ProgressTracker::new( + len, + Box::new(|completion| { + info!( + "Archive reading and decompressing {}% complete...", + completion + ) + }), + ) { + Ok(progress_tracker) => maybe_progress_tracker = Some(progress_tracker), + Err(progress_tracker_error) => { + warn!( + "Couldn't initialize progress tracker: {}", + progress_tracker_error + ) + } + }, + None => warn!("Unable to read file size, progress will not be logged."), + } + + Self { + reader, + maybe_progress_tracker, + } + } +} + +impl Read for FileStream { + fn read(&mut self, buf: &mut [u8]) -> Result { + let bytes_read = self.reader.read(buf)?; + if let Some(progress_tracker) = self.maybe_progress_tracker.as_mut() { + progress_tracker.advance_by(bytes_read); + } + Ok(bytes_read) + } +} pub fn file_stream_and_unpack_archive, P2: AsRef>( path: P1, @@ -13,10 +67,13 @@ pub fn file_stream_and_unpack_archive, P2: AsRef>( .read(true) .open(path) .map_err(Error::Source)?; - - let decoder = zstd_utils::zstd_decode_stream(input_file)?; + let file_len: Option = input_file + .metadata() + .ok() + .and_then(|metadata| metadata.len().try_into().ok()); + let file_stream = FileStream::new(input_file, file_len); + let decoder = zstd_utils::zstd_decode_stream(file_stream)?; let mut unpacker = tar_utils::unarchive_stream(decoder); unpacker.unpack(dest).map_err(Error::Streaming)?; - info!("Decompression complete."); Ok(()) } diff --git a/src/subcommands/archive/unpack/tests.rs b/src/subcommands/archive/unpack/tests.rs index ece013e..decdf76 100644 --- a/src/subcommands/archive/unpack/tests.rs +++ b/src/subcommands/archive/unpack/tests.rs @@ -207,8 +207,8 @@ fn archive_unpack_existing_destination() { // Create the destination file before downloading. let _ = File::create(&dest_path).unwrap(); - // Download should fail because the file is already present. Address doesn't - // matter because the file check is performed first. + // Download should fail because a file is already present at the destination + // directory. Address doesn't matter because the file check is performed first. assert!(download_stream::download_and_unpack_archive("bogus_address", dest_path).is_err()); } diff --git a/src/subcommands/check.rs b/src/subcommands/check.rs index fb86d8e..d895cd0 100644 --- a/src/subcommands/check.rs +++ b/src/subcommands/check.rs @@ -1,11 +1,14 @@ +use std::path::{Path, PathBuf}; + use clap::{Arg, ArgMatches, Command}; -use log::error; +use lmdb::Error as LmdbError; +use thiserror::Error as ThisError; use crate::common::db::{ db_env, BlockBodyDatabase, BlockBodyMerkleDatabase, BlockHeaderDatabase, BlockMetadataDatabase, - Database, DeployDatabase, DeployHashesDatabase, DeployMetadataDatabase, Error, + Database, DeployDatabase, DeployHashesDatabase, DeployMetadataDatabase, Error as DbError, FinalizedApprovalsDatabase, ProposerDatabase, StateStoreDatabase, TransferDatabase, - TransferHashesDatabase, + TransferHashesDatabase, STORAGE_FILE_NAME, }; pub const COMMAND_NAME: &str = "check"; @@ -21,6 +24,16 @@ enum DisplayOrder { StartAt, } +#[derive(ThisError, Debug)] +pub enum Error { + #[error("Error checking the database: {0}")] + Database(#[from] DbError), + #[error("Error initializing lmdb environment at {0}: {1}")] + Path(PathBuf, LmdbError), + #[error("Unknown database {0}")] + UnknownDb(String), +} + pub fn command(display_order: usize) -> Command<'static> { Command::new(COMMAND_NAME) .about( @@ -46,7 +59,7 @@ pub fn command(display_order: usize) -> Command<'static> { .long(DB_PATH) .takes_value(true) .value_name("DB_PATH") - .help("Path to the storage.lmdb file."), + .help("Path of the directory with the `storage.lmdb` file."), ) .arg( Arg::new(SPECIFIC) @@ -73,47 +86,45 @@ pub fn command(display_order: usize) -> Command<'static> { ) } -pub fn run(matches: &ArgMatches) -> bool { +pub fn run(matches: &ArgMatches) -> Result<(), Error> { let path = matches.value_of(DB_PATH).unwrap(); let failfast = !matches.is_present(NO_FAILFAST); let specific = matches.value_of(SPECIFIC); let start_at: usize = matches .value_of(START_AT) - .unwrap() + .expect("should have a default") .parse() - .expect("Value of \"--start-at\" must be an integer."); - - let result = check_db(path, failfast, specific, start_at); - - if let Err(error) = &result { - error!("Database check failed. {}", error); - } + .unwrap_or_else(|_| panic!("Value of \"--{}\" must be an integer.", START_AT)); - result.is_ok() + check_db(path, failfast, specific, start_at) } -fn check_db( - path: &str, +fn check_db>( + path: P, failfast: bool, specific: Option<&str>, start_at: usize, ) -> Result<(), Error> { - let env = db_env(path).expect("Failed to initialize DB environment"); + let storage_path = path.as_ref().join(STORAGE_FILE_NAME); + let env = db_env(&storage_path) + .map_err(|lmdb_err| Error::Path(path.as_ref().to_path_buf(), lmdb_err))?; if let Some(db_name) = specific { match db_name.trim() { - "block_body" => BlockBodyDatabase::check_db(&env, failfast, start_at), - "block_body_merkle" => BlockBodyMerkleDatabase::check_db(&env, failfast, start_at), - "block_header" => BlockHeaderDatabase::check_db(&env, failfast, start_at), - "block_metadata" => BlockMetadataDatabase::check_db(&env, failfast, start_at), - "deploy_hashes" => DeployHashesDatabase::check_db(&env, failfast, start_at), - "deploy_metadata" => DeployMetadataDatabase::check_db(&env, failfast, start_at), - "deploys" => DeployDatabase::check_db(&env, failfast, start_at), - "finalized_approvals" => FinalizedApprovalsDatabase::check_db(&env, failfast, start_at), - "proposers" => ProposerDatabase::check_db(&env, failfast, start_at), - "state_store" => StateStoreDatabase::check_db(&env, failfast, start_at), - "transfer" => TransferDatabase::check_db(&env, failfast, start_at), - "transfer_hashes" => TransferHashesDatabase::check_db(&env, failfast, start_at), - _ => panic!("Database {} not found.", db_name), + "block_body" => BlockBodyDatabase::check_db(&env, failfast, start_at)?, + "block_body_merkle" => BlockBodyMerkleDatabase::check_db(&env, failfast, start_at)?, + "block_header" => BlockHeaderDatabase::check_db(&env, failfast, start_at)?, + "block_metadata" => BlockMetadataDatabase::check_db(&env, failfast, start_at)?, + "deploy_hashes" => DeployHashesDatabase::check_db(&env, failfast, start_at)?, + "deploy_metadata" => DeployMetadataDatabase::check_db(&env, failfast, start_at)?, + "deploys" => DeployDatabase::check_db(&env, failfast, start_at)?, + "finalized_approvals" => { + FinalizedApprovalsDatabase::check_db(&env, failfast, start_at)? + } + "proposers" => ProposerDatabase::check_db(&env, failfast, start_at)?, + "state_store" => StateStoreDatabase::check_db(&env, failfast, start_at)?, + "transfer" => TransferDatabase::check_db(&env, failfast, start_at)?, + "transfer_hashes" => TransferHashesDatabase::check_db(&env, failfast, start_at)?, + _ => return Err(Error::UnknownDb(db_name.to_string())), } } else { // Sanity check for `start_at`, already validated in arg parser. @@ -129,6 +140,7 @@ fn check_db( ProposerDatabase::check_db(&env, failfast, start_at)?; StateStoreDatabase::check_db(&env, failfast, start_at)?; TransferDatabase::check_db(&env, failfast, start_at)?; - TransferHashesDatabase::check_db(&env, failfast, start_at) - } + TransferHashesDatabase::check_db(&env, failfast, start_at)?; + }; + Ok(()) } diff --git a/src/subcommands/latest_block_summary.rs b/src/subcommands/latest_block_summary.rs index d9228b8..10c885c 100644 --- a/src/subcommands/latest_block_summary.rs +++ b/src/subcommands/latest_block_summary.rs @@ -3,18 +3,40 @@ mod read_db; #[cfg(test)] mod tests; -use std::path::Path; +use std::{io::Error as IoError, path::Path}; +use bincode::Error as BincodeError; use clap::{Arg, ArgMatches, Command}; -use log::error; +use lmdb::Error as LmdbError; +use serde_json::Error as SerializationError; +use thiserror::Error as ThisError; pub const COMMAND_NAME: &str = "latest-block-summary"; const DB_PATH: &str = "db-path"; +const OVERWRITE: &str = "overwrite"; const OUTPUT: &str = "output"; +/// Errors encountered when operating on the storage database. +#[derive(Debug, ThisError)] +pub enum Error { + #[error("No blocks found in the block header database")] + EmptyDatabase, + /// Parsing error on entry at index in the database. + #[error("Error parsing element {0}: {1}")] + Parsing(usize, BincodeError), + /// Database operation error. + #[error("Error operating the database: {0}")] + Database(#[from] LmdbError), + #[error("Error serializing output: {0}")] + Serialize(#[from] SerializationError), + #[error("Error writing output: {0}")] + Output(#[from] IoError), +} + enum DisplayOrder { DbPath, Output, + Overwrite, } pub fn command(display_order: usize) -> Command<'static> { @@ -32,7 +54,7 @@ pub fn command(display_order: usize) -> Command<'static> { .long(DB_PATH) .takes_value(true) .value_name("DB_PATH") - .help("Path to the storage.lmdb file."), + .help("Path of the directory with the `storage.lmdb` file."), ) .arg( Arg::new(OUTPUT) @@ -46,16 +68,24 @@ pub fn command(display_order: usize) -> Command<'static> { If unspecified, defaults to standard output.", ), ) + .arg( + Arg::new(OVERWRITE) + .display_order(DisplayOrder::Overwrite as usize) + .required(false) + .short('w') + .long(OVERWRITE) + .takes_value(false) + .requires(OUTPUT) + .help( + "Overwrite an already existing output file in destination \ + directory.", + ), + ) } -pub fn run(matches: &ArgMatches) -> bool { +pub fn run(matches: &ArgMatches) -> Result<(), Error> { let path = Path::new(matches.value_of(DB_PATH).expect("should have db-path arg")); let output = matches.value_of(OUTPUT).map(Path::new); - let result = read_db::latest_block_summary(path, output); - - if let Err(error) = &result { - error!("Latest block summary failed. {}", error); - } - - result.is_ok() + let overwrite = matches.is_present(OVERWRITE); + read_db::latest_block_summary(path, output, overwrite) } diff --git a/src/subcommands/latest_block_summary/block_info.rs b/src/subcommands/latest_block_summary/block_info.rs index 34c8aa6..381122d 100644 --- a/src/subcommands/latest_block_summary/block_info.rs +++ b/src/subcommands/latest_block_summary/block_info.rs @@ -60,16 +60,15 @@ impl BlockInfo { pub fn parse_network_name>(path: P) -> Result { let canon_path = fs::canonicalize(path)?; - let network_name = canon_path - .parent() - .ok_or_else(|| IoError::from(ErrorKind::NotFound))? - .file_name() - .ok_or_else(|| { - IoError::new( - ErrorKind::InvalidInput, - "Path cannot be represented in UTF-8", - ) - })?; + if !canon_path.is_dir() { + return Err(IoError::new(ErrorKind::InvalidInput, "Not a directory")); + } + let network_name = canon_path.file_name().ok_or_else(|| { + IoError::new( + ErrorKind::InvalidInput, + "Path cannot be represented in UTF-8", + ) + })?; network_name .to_str() .ok_or_else(|| { diff --git a/src/subcommands/latest_block_summary/read_db.rs b/src/subcommands/latest_block_summary/read_db.rs index 4019a42..3877936 100644 --- a/src/subcommands/latest_block_summary/read_db.rs +++ b/src/subcommands/latest_block_summary/read_db.rs @@ -1,46 +1,58 @@ use std::{ fs::OpenOptions, - io::{self, Error as IoError, Write}, + io::{self, Write}, path::Path, result::Result, }; -use bincode::Error as BincodeError; -use lmdb::{Cursor, Environment, Error as LmdbError, Transaction}; -use log::{error, warn}; +use lmdb::{Cursor, Environment, Transaction}; +use log::{info, warn}; use serde_json::{self, Error as SerializationError}; -use thiserror::Error; use casper_node::types::BlockHeader; -use crate::common::db::{self, BlockHeaderDatabase, Database}; - -use super::block_info::{parse_network_name, BlockInfo}; - -/// Errors encountered when operating on the storage database. -#[derive(Debug, Error)] -pub enum Error { - #[error("No blocks found in the block header database")] - EmptyDatabase, - /// Parsing error on entry at index in the database. - #[error("Error parsing element {0}: {1}")] - Parsing(usize, BincodeError), - /// Database operation error. - #[error("Error operating the database: {0}")] - Database(#[from] LmdbError), - #[error("Error serializing output: {0}")] - Serialize(#[from] SerializationError), - #[error("Error writing output: {0}")] - Output(#[from] IoError), -} +use crate::common::{ + db::{self, BlockHeaderDatabase, Database, STORAGE_FILE_NAME}, + lmdb_utils, + progress::ProgressTracker, +}; + +use super::{ + block_info::{parse_network_name, BlockInfo}, + Error, +}; -fn get_highest_block(env: &Environment) -> Result { +fn get_highest_block(env: &Environment, log_progress: bool) -> Result { let txn = env.begin_ro_txn()?; let db = unsafe { txn.open_db(Some(BlockHeaderDatabase::db_name()))? }; let mut max_height = 0u64; let mut max_height_key = None; + + let maybe_entry_count = lmdb_utils::entry_count(&txn, db).ok(); + let mut maybe_progress_tracker = None; + if let Ok(mut cursor) = txn.open_ro_cursor(db) { + if log_progress { + match maybe_entry_count { + Some(entry_count) => { + match ProgressTracker::new( + entry_count, + Box::new(|completion| { + info!("Database parsing {}% complete...", completion) + }), + ) { + Ok(progress_tracker) => maybe_progress_tracker = Some(progress_tracker), + Err(progress_tracker_error) => warn!( + "Couldn't initialize progress tracker: {}", + progress_tracker_error + ), + } + } + None => warn!("Unable to count db entries, progress will not be logged."), + } + } + for (idx, (raw_key, raw_val)) in cursor.iter().enumerate() { let header: BlockHeader = bincode::deserialize(raw_val) .map_err(|bincode_err| Error::Parsing(idx, bincode_err))?; @@ -48,6 +60,10 @@ fn get_highest_block(env: &Environment) -> Result { max_height = header.height(); let _ = max_height_key.replace(raw_key); } + + if let Some(progress_tracker) = maybe_progress_tracker.as_mut() { + progress_tracker.advance_by(1); + } } } @@ -76,12 +92,19 @@ pub(crate) fn dump_block_info( pub fn latest_block_summary, P2: AsRef>( db_path: P1, output: Option, + overwrite: bool, ) -> Result<(), Error> { - let env = db::db_env(db_path.as_ref())?; + let storage_path = db_path.as_ref().join(STORAGE_FILE_NAME); + let env = db::db_env(&storage_path)?; + let mut log_progress = false; // Validate the output file early so that, in case this fails // we don't unnecessarily read the whole database. let out_writer: Box = if let Some(out_path) = output { - let file = OpenOptions::new().create(true).write(true).open(out_path)?; + let file = OpenOptions::new() + .create_new(!overwrite) + .write(true) + .open(out_path)?; + log_progress = true; Box::new(file) } else { Box::new(io::stdout()) @@ -94,7 +117,7 @@ pub fn latest_block_summary, P2: AsRef>( } }; - let highest_block = get_highest_block(&env)?; + let highest_block = get_highest_block(&env, log_progress)?; let block_info = BlockInfo::new(network_name, highest_block); dump_block_info(&block_info, out_writer)?; diff --git a/src/subcommands/latest_block_summary/tests.rs b/src/subcommands/latest_block_summary/tests.rs index 99f20bc..3fe6502 100644 --- a/src/subcommands/latest_block_summary/tests.rs +++ b/src/subcommands/latest_block_summary/tests.rs @@ -2,7 +2,7 @@ use std::fs::{self, OpenOptions}; use lmdb::{Transaction, WriteFlags}; use once_cell::sync::Lazy; -use tempfile::{self, TempDir}; +use tempfile::{self, NamedTempFile, TempDir}; use casper_node::{ rpcs::docs::DocExample, @@ -11,6 +11,7 @@ use casper_node::{ use super::block_info::BlockInfo; use crate::{ + common::db::STORAGE_FILE_NAME, subcommands::latest_block_summary::{block_info, read_db}, test_utils::{LmdbTestFixture, MockBlockHeader}, }; @@ -22,22 +23,24 @@ fn parse_network_name_input() { let root_dir = tempfile::tempdir().unwrap(); let first_node = tempfile::tempdir_in(&root_dir).unwrap(); let second_node = tempfile::tempdir_in(&first_node).unwrap(); + let file = NamedTempFile::new_in(first_node.as_ref()).unwrap(); assert_eq!( block_info::parse_network_name(&second_node).unwrap(), - first_node.path().file_name().unwrap().to_str().unwrap() + second_node.path().file_name().unwrap().to_str().unwrap() ); assert_eq!( block_info::parse_network_name(&first_node).unwrap(), - root_dir.path().file_name().unwrap().to_str().unwrap() + first_node.path().file_name().unwrap().to_str().unwrap() ); let relative_path_to_first_node = second_node.as_ref().join(".."); assert_eq!( block_info::parse_network_name(&relative_path_to_first_node).unwrap(), - root_dir.path().file_name().unwrap().to_str().unwrap() + first_node.path().file_name().unwrap().to_str().unwrap() ); assert!(block_info::parse_network_name("/").is_err()); + assert!(block_info::parse_network_name(file.path()).is_err()); } #[test] @@ -80,9 +83,10 @@ fn dump_without_net_name() { #[test] fn latest_block_should_succeed() { - let fixture = LmdbTestFixture::new(Some("block_header")); + let fixture = LmdbTestFixture::new(Some("block_header"), Some(STORAGE_FILE_NAME)); let out_file_path = OUT_DIR.as_ref().join("latest_block_metadata.json"); + // Create 2 block headers, height 0 and 1. let first_block = MockBlockHeader::default(); let first_block_key = [0u8, 0u8, 0u8]; @@ -91,6 +95,7 @@ fn latest_block_should_succeed() { second_block.height = 1; let env = &fixture.env; + // Insert the 2 blocks into the database. if let Ok(mut txn) = env.begin_rw_txn() { txn.put( fixture.db, @@ -109,19 +114,70 @@ fn latest_block_should_succeed() { txn.commit().unwrap(); }; - read_db::latest_block_summary(fixture.tmp_file.path(), Some(out_file_path.as_path())).unwrap(); + // Get the latest block information and ensure it matches with the second block. + read_db::latest_block_summary( + fixture.tmp_dir.as_ref(), + Some(out_file_path.as_path()), + false, + ) + .unwrap(); let json_str = fs::read_to_string(&out_file_path).unwrap(); let block_info: BlockInfo = serde_json::from_str(&json_str).unwrap(); let (mock_block_header_deserialized, _network_name) = block_info.into_mock(); assert_eq!(mock_block_header_deserialized, second_block); + + // Delete the second block from the database. + if let Ok(mut txn) = env.begin_rw_txn() { + txn.del(fixture.db, &second_block_key, None).unwrap(); + txn.commit().unwrap(); + }; + + // Now latest block summary should return information about the first block. + // Given that the output exists, another run on the same destination path should fail. + assert!(read_db::latest_block_summary( + fixture.tmp_dir.as_ref(), + Some(out_file_path.as_path()), + false + ) + .is_err()); + // We use `overwrite` on the previous output file. + read_db::latest_block_summary( + fixture.tmp_dir.as_ref(), + Some(out_file_path.as_path()), + true, + ) + .unwrap(); + let json_str = fs::read_to_string(&out_file_path).unwrap(); + let block_info: BlockInfo = serde_json::from_str(&json_str).unwrap(); + let (mock_block_header_deserialized, _network_name) = block_info.into_mock(); + assert_eq!(mock_block_header_deserialized, first_block); } #[test] fn latest_block_empty_db_should_fail() { - let fixture = LmdbTestFixture::new(Some("block_header_faulty")); + let fixture = LmdbTestFixture::new(Some("block_header_faulty"), Some(STORAGE_FILE_NAME)); let out_file_path = OUT_DIR.as_ref().join("empty.json"); - assert!( - read_db::latest_block_summary(fixture.tmp_file.path(), Some(out_file_path.as_path())) - .is_err() - ); + assert!(read_db::latest_block_summary( + fixture.tmp_dir.as_ref(), + Some(out_file_path.as_path()), + false + ) + .is_err()); +} + +#[test] +fn latest_block_existing_output_should_fail() { + let fixture = LmdbTestFixture::new(Some("block_header_faulty"), Some(STORAGE_FILE_NAME)); + let out_file_path = OUT_DIR.as_ref().join("existing.json"); + let _ = OpenOptions::new() + .create_new(true) + .write(true) + .open(&out_file_path) + .unwrap(); + assert!(read_db::latest_block_summary( + fixture.tmp_dir.as_ref(), + Some(out_file_path.as_path()), + false + ) + .is_err()); } diff --git a/src/subcommands/trie_compact.rs b/src/subcommands/trie_compact.rs index 62af6b7..8f5554f 100644 --- a/src/subcommands/trie_compact.rs +++ b/src/subcommands/trie_compact.rs @@ -6,8 +6,15 @@ mod tests; // public interface. mod utils; +use std::{io::Error as IoError, path::PathBuf}; + +use anyhow::Error as AnyError; use clap::{Arg, ArgMatches, Command}; -use log::error; +use lmdb::Error as LmdbError; +use thiserror::Error as ThisError; + +use casper_hashing::Digest; +use casper_node::storage::Error as StorageError; use compact::DestinationOptions; @@ -20,6 +27,38 @@ const DEFAULT_MAX_DB_SIZE: &str = "483183820800"; // 450 gb const SOURCE_TRIE_STORE_PATH: &str = "src-trie"; const STORAGE_PATH: &str = "storage-path"; +/// Possible errors caught while compacting the trie store. +#[derive(Debug, ThisError)] +pub enum Error { + /// Error copying the state root with a specific digest. + #[error("Error copying state root {0}: {1}")] + CopyStateRoot(Digest, AnyError), + /// Error creating the execution engine for the destination trie. + #[error("Error loading the execution engine: {0}")] + CreateDestTrie(AnyError), + /// Error working with the destination trie path. + #[error("Invalid destination: {0}")] + InvalidDest(String), + /// Path cannot be created/resolved. + #[error("Path {0} cannot be created/resolved: {1}")] + InvalidPath(PathBuf, IoError), + /// Error while operating on LMDB. + #[error("Error while operating on LMDB: {0}")] + LmdbOperation(LmdbError), + /// A block of specific height is missing from the storage. + #[error("Storage database is missing block {0}")] + MissingBlock(u64), + /// Error creating the execution engine for the source trie. + #[error("Error creating the execution engine: {0}")] + OpenSourceTrie(AnyError), + /// Error opening the block/deploys LMDB store. + #[error("Error opening the block/deploy storage: {0}")] + OpenStorage(AnyError), + /// Error while getting a block of specific height from storage. + #[error("Storage error while trying to retrieve block {0}: {1}")] + Storage(u64, StorageError), +} + enum DisplayOrder { SourcePath, DestinationPath, @@ -108,7 +147,7 @@ pub fn command(display_order: usize) -> Command<'static> { ) } -pub fn run(matches: &ArgMatches) -> bool { +pub fn run(matches: &ArgMatches) -> Result<(), Error> { let storage_path = matches.value_of(STORAGE_PATH).unwrap(); let source_trie_path = matches.value_of(SOURCE_TRIE_STORE_PATH).unwrap(); let destination_trie_path = matches.value_of(DESTINATION_TRIE_STORE_PATH).unwrap(); @@ -124,17 +163,11 @@ pub fn run(matches: &ArgMatches) -> bool { .parse() .expect("Value of \"--max-db-size\" must be an integer."); - let result = compact::trie_compact( + compact::trie_compact( storage_path, source_trie_path, destination_trie_path, dest_opt, max_db_size, - ); - - if let Err(error) = &result { - error!("Trie compact failed. {}", error); - } - - result.is_ok() + ) } diff --git a/src/subcommands/trie_compact/compact.rs b/src/subcommands/trie_compact/compact.rs index 6fa32a1..61cbf7f 100644 --- a/src/subcommands/trie_compact/compact.rs +++ b/src/subcommands/trie_compact/compact.rs @@ -1,20 +1,17 @@ use std::{ collections::HashSet, - fmt::{Display, Formatter, Result as FmtResult}, fs::{self, File, OpenOptions}, - io::Error as IoError, - path::{Path, PathBuf}, + path::Path, }; -use anyhow::Error as AnyError; -use lmdb::Error as LmdbError; use log::info; -use thiserror::Error as ThisError; use casper_hashing::Digest; -use casper_node::storage::Error as StorageError; -use super::utils::{create_execution_engine, create_storage, load_execution_engine}; +use super::{ + utils::{create_execution_engine, create_storage, load_execution_engine}, + Error, +}; pub(crate) const TRIE_STORE_FILE_NAME: &str = "data.lmdb"; @@ -29,62 +26,6 @@ pub enum DestinationOptions { New, } -/// Possible errors caught while compacting the trie store. -#[derive(Debug, ThisError)] -pub enum Error { - /// Error copying the state root with a specific digest. - CopyStateRoot(Digest, AnyError), - /// Error creating the execution engine for the source trie. - OpenSourceTrie(AnyError), - /// Error opening the block/deploys LMDB store. - OpenStorage(AnyError), - /// Error working with the destination trie path. - InvalidDest(String), - /// Path cannot be created/resolved. - InvalidPath(PathBuf, IoError), - /// Error while operating on LMDB. - LmdbOperation(LmdbError), - /// Error creating the execution engine for the destination trie. - CreateDestTrie(AnyError), - /// A block of specific height is missing from the storage. - MissingBlock(u64), - /// Error while getting a block of specific height from storage. - Storage(u64, StorageError), -} - -impl Display for Error { - fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { - match self { - Self::CopyStateRoot(digest, err) => { - write!(f, "Error copying state root {}: {}", digest, err) - } - Self::CreateDestTrie(err) => { - write!(f, "Error loading the execution engine: {}", err) - } - Self::InvalidDest(msg) => write!(f, "Invalid destination: {}", msg), - Self::InvalidPath(path, err) => write!( - f, - "Path {} cannot be created/resolved: {}", - path.to_string_lossy(), - err - ), - Self::LmdbOperation(err) => write!(f, "Error operation on LMDB: {}.", err), - Self::MissingBlock(idx) => write!(f, "Storage database is missing block {}.", idx), - Self::OpenSourceTrie(err) => { - write!(f, "Error creating the execution engine: {}", err) - } - Self::OpenStorage(err) => { - write!(f, "Error opening the block/deploy storage: {}", err) - } - Self::Storage(idx, err) => write!( - f, - "Storage error while trying to retrieve block {}: {}", - idx, err - ), - } - } -} - fn validate_trie_paths, P2: AsRef>( source_trie_path: P1, destination_trie_path: P2, diff --git a/src/subcommands/trie_compact/tests.rs b/src/subcommands/trie_compact/tests.rs index 970d01d..bc468cc 100644 --- a/src/subcommands/trie_compact/tests.rs +++ b/src/subcommands/trie_compact/tests.rs @@ -17,8 +17,9 @@ use casper_types::bytesrepr::{Bytes, ToBytes}; static DEFAULT_MAX_DB_SIZE: Lazy = Lazy::new(|| super::DEFAULT_MAX_DB_SIZE.parse().unwrap()); use super::{ - compact::{self, DestinationOptions, Error, TRIE_STORE_FILE_NAME}, + compact::{self, DestinationOptions, TRIE_STORE_FILE_NAME}, utils::{create_execution_engine, create_storage, load_execution_engine}, + Error, }; #[derive(Clone, Debug, PartialEq)] diff --git a/src/subcommands/unsparse.rs b/src/subcommands/unsparse.rs index bd3165b..ac4ac84 100644 --- a/src/subcommands/unsparse.rs +++ b/src/subcommands/unsparse.rs @@ -1,12 +1,27 @@ -use std::{fs, path::Path}; +use std::{ + fs, + io::Error as IoError, + path::{Path, PathBuf}, +}; use clap::{Arg, ArgMatches, Command}; -use lmdb::{Environment, EnvironmentFlags}; +use lmdb::{Environment, EnvironmentFlags, Error as LmdbError}; use log::{error, info}; +use thiserror::Error as ThisError; pub const COMMAND_NAME: &str = "unsparse"; const DB_PATH: &str = "file-path"; +#[derive(ThisError, Debug)] +pub enum Error { + #[error("Failed to get metadata for {0}: {1}")] + Metadata(PathBuf, IoError), + #[error("Failed to open lmdb database at {0}: {1}")] + Lmdb(PathBuf, LmdbError), + #[error("Failed to reduce size of {0} from {1} bytes")] + Size(PathBuf, u64), +} + pub fn command(display_order: usize) -> Command<'static> { Command::new(COMMAND_NAME) .display_order(display_order) @@ -23,7 +38,7 @@ pub fn command(display_order: usize) -> Command<'static> { ) } -pub fn run(matches: &ArgMatches) -> bool { +pub fn run(matches: &ArgMatches) -> Result<(), Error> { let path = Path::new( matches .value_of(DB_PATH) @@ -32,35 +47,21 @@ pub fn run(matches: &ArgMatches) -> bool { unsparse(path) } -fn unsparse(path: &Path) -> bool { - let size_before = match fs::metadata(path) { - Ok(metadata) => metadata.len(), - Err(error) => { - error!("Failed to get metadata for {}: {}", path.display(), error); - return false; - } - }; +fn unsparse(path: &Path) -> Result<(), Error> { + let size_before = fs::metadata(path) + .map(|metadata| metadata.len()) + .map_err(|io_err| Error::Metadata(path.to_path_buf(), io_err))?; - let _env = match Environment::new() + let _env = Environment::new() .set_flags(EnvironmentFlags::WRITE_MAP | EnvironmentFlags::NO_SUB_DIR) .set_max_dbs(100) .set_map_size(1) .open(path) - { - Ok(env) => env, - Err(error) => { - error!("Failed to open {}: {}", path.display(), error); - return false; - } - }; + .map_err(|lmdb_err| Error::Lmdb(path.to_path_buf(), lmdb_err))?; - let size_after = match fs::metadata(path) { - Ok(metadata) => metadata.len(), - Err(error) => { - error!("Failed to get metadata for {}: {}", path.display(), error); - return false; - } - }; + let size_after = fs::metadata(path) + .map(|metadata| metadata.len()) + .map_err(|io_err| Error::Metadata(path.to_path_buf(), io_err))?; if size_before > size_after { info!( @@ -69,14 +70,14 @@ fn unsparse(path: &Path) -> bool { size_before, size_after ); - true + Ok(()) } else { error!( "Failed to reduce size of {} from {} bytes.", path.display(), size_before ); - false + Err(Error::Size(path.to_path_buf(), size_before)) } } @@ -87,8 +88,8 @@ mod tests { #[test] fn should_reduce_lmdb_file_size() { - let fixture = LmdbTestFixture::new(Some("a")); - let db_path = fixture.tmp_file.path(); + let fixture = LmdbTestFixture::new(Some("a"), None); + let db_path = fixture.file_path.as_path(); let db_size = || { fs::metadata(db_path) .unwrap_or_else(|error| { @@ -97,11 +98,11 @@ mod tests { .len() }; let size_before = db_size(); - assert!(unsparse(db_path), "unsparse should succeed"); + unsparse(db_path).expect("unsparse should succeed"); let size_after = db_size(); assert!(size_after < size_before, "unsparse should reduce file size"); - assert!(!unsparse(db_path), "repeat unsparse should fail"); + assert!(unsparse(db_path).is_err(), "repeat unsparse should fail"); assert_eq!(db_size(), size_after, "file size should be unchanged"); } } diff --git a/src/test_utils.rs b/src/test_utils.rs index 37ac84c..3f11dac 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -1,8 +1,10 @@ #![cfg(test)] +use std::{fs::OpenOptions, path::PathBuf}; + use lmdb::{Database as LmdbDatabase, DatabaseFlags, Environment, EnvironmentFlags}; use serde::{Deserialize, Serialize}; -use tempfile::NamedTempFile; +use tempfile::{NamedTempFile, TempDir}; use casper_hashing::Digest; use casper_node::types::{BlockHash, Timestamp}; @@ -11,12 +13,33 @@ use casper_types::{EraId, ProtocolVersion}; pub struct LmdbTestFixture { pub env: Environment, pub db: LmdbDatabase, - pub tmp_file: NamedTempFile, + pub tmp_dir: TempDir, + pub file_path: PathBuf, } impl LmdbTestFixture { - pub fn new(name: Option<&str>) -> Self { - let tmp_file = NamedTempFile::new().unwrap(); + pub fn new(name: Option<&str>, file_name: Option<&str>) -> Self { + let tmp_dir = tempfile::tempdir().unwrap(); + let file_path = if let Some(name) = file_name { + let path = tmp_dir.as_ref().join(name); + let _ = OpenOptions::new() + .create_new(true) + .write(true) + .open(&path) + .unwrap(); + path + } else { + let path = NamedTempFile::new_in(tmp_dir.as_ref()) + .unwrap() + .path() + .to_path_buf(); + let _ = OpenOptions::new() + .create_new(true) + .write(true) + .open(&path) + .unwrap(); + path + }; let env = Environment::new() .set_flags( EnvironmentFlags::WRITE_MAP @@ -27,12 +50,17 @@ impl LmdbTestFixture { .set_max_readers(12) .set_map_size(4096 * 10) .set_max_dbs(10) - .open(tmp_file.path()) + .open(&file_path) .expect("can't create environment"); let db = env .create_db(name, DatabaseFlags::empty()) .expect("can't create database"); - LmdbTestFixture { env, db, tmp_file } + LmdbTestFixture { + env, + db, + tmp_dir, + file_path, + } } }