From 8d0157a969a9388ca1f942709837a5a3e591d155 Mon Sep 17 00:00:00 2001 From: 0x009922 <43530070+0x009922@users.noreply.github.com> Date: Thu, 14 Mar 2024 17:42:51 +0900 Subject: [PATCH] [feature] #4059: introduce snapshot "mode" (#4365) Signed-off-by: Dmitry Balashov <43530070+0x009922@users.noreply.github.com> --- cli/src/lib.rs | 52 +++-- config/src/kura.rs | 12 +- config/src/lib.rs | 1 + config/src/parameters/actual.rs | 4 +- config/src/parameters/defaults.rs | 3 +- config/src/parameters/user.rs | 17 +- config/src/parameters/user/boilerplate.rs | 23 +- config/src/snapshot.rs | 36 ++++ config/tests/fixtures.rs | 8 +- config/tests/fixtures/full.env | 2 +- config/tests/fixtures/full.toml | 4 +- configs/peer.template.toml | 2 +- core/benches/kura.rs | 2 +- core/src/kura.rs | 20 +- core/src/queue.rs | 3 +- core/src/snapshot.rs | 244 ++++++++++++++++------ 16 files changed, 297 insertions(+), 136 deletions(-) create mode 100644 config/src/snapshot.rs diff --git a/cli/src/lib.rs b/cli/src/lib.rs index 996fc52a76a..9ebcce6f929 100644 --- a/cli/src/lib.rs +++ b/cli/src/lib.rs @@ -20,7 +20,9 @@ use iroha_core::{ query::store::LiveQueryStore, queue::Queue, smartcontracts::isi::Registrable as _, - snapshot::{try_read_snapshot, SnapshotMaker, SnapshotMakerHandle}, + snapshot::{ + try_read_snapshot, SnapshotMaker, SnapshotMakerHandle, TryReadError as TryReadSnapshotError, + }, sumeragi::{SumeragiHandle, SumeragiStartArgs}, IrohaNetwork, }; @@ -57,8 +59,8 @@ pub struct Iroha { pub kura: Arc, /// Torii web server pub torii: Option, - /// Snapshot service - pub snapshot_maker: SnapshotMakerHandle, + /// Snapshot service. Might be not started depending on the config. + pub snapshot_maker: Option, /// Thread handlers thread_handlers: Vec, @@ -215,30 +217,37 @@ impl Iroha { let live_query_store_handle = LiveQueryStore::from_config(config.live_query_store).start(); let block_count = kura.init()?; - let wsv = try_read_snapshot( + + let wsv = match try_read_snapshot( &config.snapshot.store_dir, &kura, live_query_store_handle.clone(), block_count, - ) - .map_or_else( - |error| { - iroha_logger::warn!(%error, "Failed to load wsv from snapshot, creating empty wsv"); - WorldStateView::from_config( - config.chain_wide, - world, - Arc::clone(&kura), - live_query_store_handle.clone(), - ) - }, - |wsv| { + ) { + Ok(wsv) => { iroha_logger::info!( at_height = wsv.height(), - "Successfully loaded wsv from snapshot" + "Successfully loaded WSV from a snapshot" ); - wsv - }, - ); + Some(wsv) + } + Err(TryReadSnapshotError::NotFound) => { + iroha_logger::info!("Didn't find a snapshot of WSV, creating an empty one"); + None + } + Err(error) => { + iroha_logger::warn!(%error, "Failed to load WSV from a snapshot, creating an empty one"); + None + } + }.unwrap_or_else(|| { + WorldStateView::from_config( + config.chain_wide, + world, + Arc::clone(&kura), + live_query_store_handle.clone(), + ) + + }); let queue = Arc::new(Queue::from_config(config.queue)); match Self::start_telemetry(&logger, &config).await? { @@ -298,7 +307,8 @@ impl Iroha { } .start(); - let snapshot_maker = SnapshotMaker::from_config(&config.snapshot, sumeragi.clone()).start(); + let snapshot_maker = + SnapshotMaker::from_config(&config.snapshot, &sumeragi).map(SnapshotMaker::start); let kiso = KisoHandle::new(config.clone()); diff --git a/config/src/kura.rs b/config/src/kura.rs index 507e44db3da..30d0af9a4de 100644 --- a/config/src/kura.rs +++ b/config/src/kura.rs @@ -18,7 +18,7 @@ use serde_with::{DeserializeFromStr, SerializeDisplay}; SerializeDisplay, )] #[strum(serialize_all = "snake_case")] -pub enum Mode { +pub enum InitMode { /// Strict validation of all blocks. #[default] Strict, @@ -28,13 +28,13 @@ pub enum Mode { #[cfg(test)] mod tests { - use crate::kura::Mode; + use crate::kura::InitMode; #[test] fn init_mode_display_reprs() { - assert_eq!(format!("{}", Mode::Strict), "strict"); - assert_eq!(format!("{}", Mode::Fast), "fast"); - assert_eq!("strict".parse::().unwrap(), Mode::Strict); - assert_eq!("fast".parse::().unwrap(), Mode::Fast); + assert_eq!(format!("{}", InitMode::Strict), "strict"); + assert_eq!(format!("{}", InitMode::Fast), "fast"); + assert_eq!("strict".parse::().unwrap(), InitMode::Strict); + assert_eq!("fast".parse::().unwrap(), InitMode::Fast); } } diff --git a/config/src/lib.rs b/config/src/lib.rs index 1697443be46..8776dac64e4 100644 --- a/config/src/lib.rs +++ b/config/src/lib.rs @@ -6,3 +6,4 @@ pub mod client_api; pub mod kura; pub mod logger; pub mod parameters; +pub mod snapshot; diff --git a/config/src/parameters/actual.rs b/config/src/parameters/actual.rs index 249553d7195..df47ba9c2ee 100644 --- a/config/src/parameters/actual.rs +++ b/config/src/parameters/actual.rs @@ -19,7 +19,7 @@ use url::Url; pub use user::{Logger, Queue, Snapshot}; use crate::{ - kura::Mode, + kura::InitMode, parameters::{ defaults, user, user::{CliContext, RootPartial}, @@ -119,7 +119,7 @@ impl Genesis { #[allow(missing_docs)] #[derive(Debug, Clone)] pub struct Kura { - pub init_mode: Mode, + pub init_mode: InitMode, pub store_dir: PathBuf, pub debug_output_new_blocks: bool, } diff --git a/config/src/parameters/defaults.rs b/config/src/parameters/defaults.rs index a6f779d087b..ff55704ee09 100644 --- a/config/src/parameters/defaults.rs +++ b/config/src/parameters/defaults.rs @@ -46,9 +46,8 @@ pub mod snapshot { use super::*; pub const DEFAULT_STORE_DIR: &str = "./storage/snapshot"; - // Default frequency of making snapshots is 1 minute, need to be adjusted for larger world state view size + // The default frequency of making snapshots is 1 minute, need to be adjusted for larger world state view size pub const DEFAULT_CREATE_EVERY: Duration = Duration::from_secs(60); - pub const DEFAULT_ENABLED: bool = true; } pub mod chain_wide { diff --git a/config/src/parameters/user.rs b/config/src/parameters/user.rs index d4e523b4732..d395f50fbb6 100644 --- a/config/src/parameters/user.rs +++ b/config/src/parameters/user.rs @@ -31,9 +31,10 @@ use iroha_primitives::{addr::SocketAddr, unique_vec::UniqueVec}; use url::Url; use crate::{ - kura::Mode, - logger::Format, + kura::InitMode as KuraInitMode, + logger::Format as LoggerFormat, parameters::{actual, defaults::telemetry::*}, + snapshot::Mode as SnapshotMode, }; mod boilerplate; @@ -343,7 +344,7 @@ pub enum GenesisConfigError { #[derive(Debug)] pub struct Kura { - pub init_mode: Mode, + pub init_mode: KuraInitMode, pub store_dir: PathBuf, pub debug: KuraDebug, } @@ -469,7 +470,7 @@ pub struct Logger { // looks inconsistent pub level: Level, /// Output format - pub format: Format, + pub format: LoggerFormat, #[cfg(feature = "tokio-console")] /// Address of tokio console (only available under "tokio-console" feature) pub tokio_console_address: SocketAddr, @@ -480,7 +481,7 @@ impl Default for Logger { fn default() -> Self { Self { level: Level::default(), - format: Format::default(), + format: LoggerFormat::default(), #[cfg(feature = "tokio-console")] tokio_console_address: super::defaults::logger::DEFAULT_TOKIO_CONSOLE_ADDR, } @@ -541,9 +542,9 @@ impl Telemetry { #[derive(Debug, Clone)] pub struct Snapshot { + pub mode: SnapshotMode, pub create_every: Duration, pub store_dir: PathBuf, - pub creation_enabled: bool, } #[derive(Debug, Copy, Clone)] @@ -574,7 +575,7 @@ impl ChainWide { asset_definition_metadata_limits, account_metadata_limits, domain_metadata_limits, - ident_length_limits: identifier_length_limits, + ident_length_limits, executor_fuel_limit, executor_max_memory, wasm_fuel_limit, @@ -590,7 +591,7 @@ impl ChainWide { asset_definition_metadata_limits, account_metadata_limits, domain_metadata_limits, - ident_length_limits: identifier_length_limits, + ident_length_limits, executor_runtime: actual::WasmRuntime { fuel_limit: executor_fuel_limit, max_memory_bytes: executor_max_memory.get(), diff --git a/config/src/parameters/user/boilerplate.rs b/config/src/parameters/user/boilerplate.rs index 4fc88772ad5..6ad2129db0c 100644 --- a/config/src/parameters/user/boilerplate.rs +++ b/config/src/parameters/user/boilerplate.rs @@ -26,7 +26,7 @@ use serde::{Deserialize, Serialize}; use url::Url; use crate::{ - kura::Mode, + kura::InitMode as KuraInitMode, logger::Format, parameters::{ defaults::{self, chain_wide::*, network::*, queue::*, torii::*}, @@ -36,6 +36,7 @@ use crate::{ SumeragiDebug, Telemetry, TelemetryDev, Torii, }, }, + snapshot::Mode as SnapshotMode, }; #[derive(Deserialize, Serialize, Debug, Default, Merge)] @@ -260,7 +261,7 @@ impl FromEnv for GenesisPartial { #[derive(Clone, Deserialize, Serialize, Debug, Default, Merge)] #[serde(deny_unknown_fields, default)] pub struct KuraPartial { - pub init_mode: UserField, + pub init_mode: UserField, pub store_dir: UserField, pub debug: KuraDebugPartial, } @@ -595,9 +596,9 @@ impl FromEnvDefaultFallback for TelemetryPartial {} #[derive(Debug, Clone, Deserialize, Serialize, Default, Merge)] #[serde(deny_unknown_fields, default)] pub struct SnapshotPartial { + pub mode: UserField, pub create_every: UserField, pub store_dir: UserField, - pub creation_enabled: UserField, } impl UnwrapPartial for SnapshotPartial { @@ -605,9 +606,7 @@ impl UnwrapPartial for SnapshotPartial { fn unwrap_partial(self) -> UnwrapPartialResult { Ok(Snapshot { - creation_enabled: self - .creation_enabled - .unwrap_or(defaults::snapshot::DEFAULT_ENABLED), + mode: self.mode.unwrap_or_default(), create_every: self .create_every .get() @@ -627,6 +626,9 @@ impl FromEnv for SnapshotPartial { { let mut emitter = Emitter::new(); + let mode = + ParseEnvResult::parse_simple(&mut emitter, env, "SNAPSHOT_MODE", "snapshot.mode") + .into(); let store_dir = ParseEnvResult::parse_simple( &mut emitter, env, @@ -634,19 +636,12 @@ impl FromEnv for SnapshotPartial { "snapshot.store_dir", ) .into(); - let creation_enabled = ParseEnvResult::parse_simple( - &mut emitter, - env, - "SNAPSHOT_CREATION_ENABLED", - "snapshot.creation_enabled", - ) - .into(); emitter.finish()?; Ok(Self { + mode, store_dir, - creation_enabled, ..Self::default() }) } diff --git a/config/src/snapshot.rs b/config/src/snapshot.rs new file mode 100644 index 00000000000..860fe6398ce --- /dev/null +++ b/config/src/snapshot.rs @@ -0,0 +1,36 @@ +//! Configuration related to Snapshot specifically + +/// Functioning mode of the Snapshot Iroha module +#[derive( + Copy, + Clone, + Debug, + Default, + strum::Display, + strum::EnumString, + serde_with::SerializeDisplay, + serde_with::DeserializeFromStr, +)] +#[strum(serialize_all = "snake_case")] +pub enum Mode { + /// Read the snapshot on startup, update periodically + #[default] + ReadWrite, + /// Read the snapshot on startup, do not update + Readonly, + /// Do not read or write the snapshot + Disabled, +} + +#[cfg(test)] +mod tests { + use crate::snapshot::Mode; + + #[test] + fn mode_display_form() { + assert_eq!( + format!("{} {} {}", Mode::ReadWrite, Mode::Readonly, Mode::Disabled), + "read_write readonly disabled" + ); + } +} diff --git a/config/tests/fixtures.rs b/config/tests/fixtures.rs index 1933fd5844b..318d46c37dd 100644 --- a/config/tests/fixtures.rs +++ b/config/tests/fixtures.rs @@ -123,9 +123,9 @@ fn minimal_config_snapshot() -> Result<()> { future_threshold: 1s, }, snapshot: Snapshot { + mode: ReadWrite, create_every: 60s, store_dir: "./storage/snapshot", - creation_enabled: true, }, telemetry: None, dev_telemetry: None, @@ -367,13 +367,13 @@ fn full_envs_set_is_consumed() -> Result<()> { future_threshold: None, }, snapshot: SnapshotPartial { + mode: Some( + ReadWrite, + ), create_every: None, store_dir: Some( "/snapshot/path/from/env", ), - creation_enabled: Some( - false, - ), }, telemetry: TelemetryPartial { name: None, diff --git a/config/tests/fixtures/full.env b/config/tests/fixtures/full.env index 149d222de13..b839709b1e1 100644 --- a/config/tests/fixtures/full.env +++ b/config/tests/fixtures/full.env @@ -12,5 +12,5 @@ KURA_STORE_DIR=/store/path/from/env KURA_DEBUG_OUTPUT_NEW_BLOCKS=false LOG_LEVEL=DEBUG LOG_FORMAT=pretty +SNAPSHOT_MODE=read_write SNAPSHOT_STORE_DIR=/snapshot/path/from/env -SNAPSHOT_CREATION_ENABLED=false diff --git a/config/tests/fixtures/full.toml b/config/tests/fixtures/full.toml index fd85a0f700d..f38ad0e38ef 100644 --- a/config/tests/fixtures/full.toml +++ b/config/tests/fixtures/full.toml @@ -47,7 +47,7 @@ transaction_time_to_live = 100 future_threshold = 50 [snapshot] -creation_enabled = true +mode = "read_write" create_every = 60_000 store_dir = "./storage/snapshot" @@ -64,7 +64,7 @@ out_file = "./dev-telemetry.json5" max_transactions_in_block = 512 block_time = 2_000 commit_time = 4_000 -transaction_limits = {max_instruction_number = 4096, max_wasm_size_bytes = 4194304 } +transaction_limits = { max_instruction_number = 4096, max_wasm_size_bytes = 4194304 } asset_metadata_limits = { capacity = 1048576, max_entry_len = 4096 } asset_definition_metadata_limits = { capacity = 1048576, max_entry_len = 4096 } account_metadata_limits = { capacity = 1048576, max_entry_len = 4096 } diff --git a/configs/peer.template.toml b/configs/peer.template.toml index 855e44c6c0b..bc01942940e 100644 --- a/configs/peer.template.toml +++ b/configs/peer.template.toml @@ -52,7 +52,7 @@ # future_threshold = "1s" [snapshot] -# creation_enabled = true +# mode = "read_write" # create_every = "1min" # store_dir = "./storage/snapshot" diff --git a/core/benches/kura.rs b/core/benches/kura.rs index 7d43b24b155..e8e0e6b75c5 100644 --- a/core/benches/kura.rs +++ b/core/benches/kura.rs @@ -41,7 +41,7 @@ async fn measure_block_size_for_n_executors(n_executors: u32) { .expect("Failed to accept Transaction."); let dir = tempfile::tempdir().expect("Could not create tempfile."); let cfg = Config { - init_mode: iroha_config::kura::Mode::Strict, + init_mode: iroha_config::kura::InitMode::Strict, debug_output_new_blocks: false, store_dir: dir.path().to_path_buf(), }; diff --git a/core/src/kura.rs b/core/src/kura.rs index c70e5557323..3dc536f9c2d 100644 --- a/core/src/kura.rs +++ b/core/src/kura.rs @@ -10,7 +10,7 @@ use std::{ sync::Arc, }; -use iroha_config::{kura::Mode, parameters::actual::Kura as Config}; +use iroha_config::{kura::InitMode, parameters::actual::Kura as Config}; use iroha_crypto::{Hash, HashOf}; use iroha_data_model::block::SignedBlock; use iroha_logger::prelude::*; @@ -31,7 +31,7 @@ const SIZE_OF_BLOCK_HASH: u64 = Hash::LENGTH as u64; #[derive(Debug)] pub struct Kura { /// The mode of initialisation of [`Kura`]. - mode: Mode, + mode: InitMode, /// The block storage block_store: Mutex, /// The array of block hashes and a slot for an arc of the block. This is normally recovered from the index file. @@ -71,7 +71,7 @@ impl Kura { /// for in-memory blocks only. pub fn blank_kura_for_testing() -> Arc { Arc::new(Self { - mode: Mode::Strict, + mode: InitMode::Strict, block_store: Mutex::new(BlockStore::new(PathBuf::new(), LockStatus::Locked)), block_data: Mutex::new(Vec::new()), block_plain_text_path: None, @@ -112,11 +112,13 @@ impl Kura { .expect("We don't have 4 billion blocks."); let block_hashes = match self.mode { - Mode::Fast => Kura::init_fast_mode(&block_store, block_index_count).or_else(|error| { - warn!(%error, "Hashes file is broken. Falling back to strict init mode."); - Kura::init_strict_mode(&mut block_store, block_index_count) - }), - Mode::Strict => Kura::init_strict_mode(&mut block_store, block_index_count), + InitMode::Fast => { + Kura::init_fast_mode(&block_store, block_index_count).or_else(|error| { + warn!(%error, "Hashes file is broken. Falling back to strict init mode."); + Kura::init_strict_mode(&mut block_store, block_index_count) + }) + } + InitMode::Strict => Kura::init_strict_mode(&mut block_store, block_index_count), }?; let block_count = block_hashes.len(); @@ -1047,7 +1049,7 @@ mod tests { async fn strict_init_kura() { let temp_dir = TempDir::new().unwrap(); Kura::new(&Config { - init_mode: Mode::Strict, + init_mode: InitMode::Strict, store_dir: temp_dir.path().to_str().unwrap().into(), debug_output_new_blocks: false, }) diff --git a/core/src/queue.rs b/core/src/queue.rs index b31171e6f71..195dd81ddcc 100644 --- a/core/src/queue.rs +++ b/core/src/queue.rs @@ -352,7 +352,8 @@ impl Queue { } #[cfg(test)] -mod tests { +// this is `pub` to re-use internal utils +pub mod tests { use std::{str::FromStr, sync::Arc, thread, time::Duration}; use iroha_data_model::{prelude::*, transaction::TransactionLimits}; diff --git a/core/src/snapshot.rs b/core/src/snapshot.rs index 22e7e3762b9..ec83e0f0da4 100644 --- a/core/src/snapshot.rs +++ b/core/src/snapshot.rs @@ -6,7 +6,7 @@ use std::{ time::Duration, }; -use iroha_config::parameters::actual::Snapshot as Config; +use iroha_config::{parameters::actual::Snapshot as Config, snapshot::Mode}; use iroha_crypto::HashOf; use iroha_data_model::block::SignedBlock; use iroha_logger::prelude::*; @@ -25,8 +25,8 @@ const SNAPSHOT_FILE_NAME: &str = "snapshot.data"; /// Name of the temporary [`WorldStateView`] snapshot file. const SNAPSHOT_TMP_FILE_NAME: &str = "snapshot.tmp"; -/// Errors produced by [`SnapshotMaker`] actor. -pub type Result = core::result::Result; +// /// Errors produced by [`SnapshotMaker`] actor. +// pub type Result = core::result::Result; /// [`SnapshotMaker`] actor handle. #[derive(Clone)] @@ -39,11 +39,9 @@ pub struct SnapshotMakerHandle { pub struct SnapshotMaker { sumeragi: SumeragiHandle, /// Frequency at which snapshot is made - snapshot_create_every: Duration, + create_every: Duration, /// Path to the directory where snapshots are stored - snapshot_dir: PathBuf, - /// Flag to enable/disable snapshot creation - snapshot_creation_enabled: bool, + store_dir: PathBuf, /// Flag to signal that new wsv is available for taking snapshot new_wsv_available: bool, } @@ -52,11 +50,8 @@ impl SnapshotMaker { /// Start [`Self`] actor. pub fn start(self) -> SnapshotMakerHandle { let (message_sender, message_receiver) = mpsc::channel(1); - if self.snapshot_creation_enabled { - tokio::task::spawn(self.run(message_receiver)); - } else { - iroha_logger::info!("Snapshot creation is disabled"); - } + tokio::task::spawn(self.run(message_receiver)); + SnapshotMakerHandle { _message_sender: message_sender, } @@ -64,7 +59,7 @@ impl SnapshotMaker { /// [`Self`] task. async fn run(mut self, mut message_receiver: mpsc::Receiver<()>) { - let mut snapshot_create_every = tokio::time::interval(self.snapshot_create_every); + let mut snapshot_create_every = tokio::time::interval(self.create_every); // Don't try to create snapshot more frequently if previous take longer time snapshot_create_every.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); @@ -92,81 +87,70 @@ impl SnapshotMaker { /// Invoke snapshot creation task async fn create_snapshot(&mut self) { let sumeragi = self.sumeragi.clone(); - let path_to_snapshot = self.snapshot_dir.clone(); - let handle = tokio::task::spawn_blocking(move || -> Result { + let store_dir = self.store_dir.clone(); + let handle = tokio::task::spawn_blocking(move || -> Result { sumeragi.apply_finalized_wsv(|wsv| { - Self::try_write_snapshot(wsv, &path_to_snapshot)?; + try_write_snapshot(wsv, store_dir)?; Ok(wsv.height()) }) }); match handle.await { Ok(Ok(at_height)) => { - iroha_logger::info!(at_height, "Snapshot for wsv was created successfully."); + iroha_logger::info!(at_height, "Successfully created a snapshot of WSV"); self.new_wsv_available = false; } Ok(Err(error)) => { - iroha_logger::error!(%error, "Failed to create snapshot for wsv."); + iroha_logger::error!(%error, "Failed to create a snapshot of WSV"); } Err(panic) => { - iroha_logger::error!(%panic, "Task panicked during creation of wsv snapshot."); + iroha_logger::error!(%panic, "Task panicked during creation of WSV snapshot"); } } } - /// Serialize and write snapshot to file, - /// overwriting any previously stored data. + /// Create from [`Config`]. /// - /// # Errors - /// - IO errors - /// - Serialization errors - fn try_write_snapshot(wsv: &WorldStateView, snapshot_dir: impl AsRef) -> Result<()> { - let path_to_file = snapshot_dir.as_ref().join(SNAPSHOT_FILE_NAME); - let path_to_tmp_file = snapshot_dir.as_ref().join(SNAPSHOT_TMP_FILE_NAME); - let file = std::fs::OpenOptions::new() - .create(true) - .write(true) - .truncate(true) - .open(&path_to_tmp_file) - .map_err(|err| Error::IO(err, path_to_tmp_file.clone()))?; - let mut serializer = serde_json::Serializer::new(file); - wsv.serialize(&mut serializer)?; - std::fs::rename(path_to_tmp_file, &path_to_file) - .map_err(|err| Error::IO(err, path_to_file.clone()))?; - Ok(()) - } - - /// Create [`Self`] from [`Configuration`] - pub fn from_config(config: &Config, sumeragi: SumeragiHandle) -> Self { - Self { - sumeragi, - snapshot_create_every: config.create_every, - snapshot_dir: config.store_dir.clone(), - snapshot_creation_enabled: config.creation_enabled, - new_wsv_available: false, + /// Might return [`None`] if the configuration is not suitable for _making_ snapshots. + pub fn from_config(config: &Config, sumeragi: &SumeragiHandle) -> Option { + if let Mode::ReadWrite = config.mode { + Some(Self { + sumeragi: sumeragi.clone(), + create_every: config.create_every, + store_dir: config.store_dir.clone(), + new_wsv_available: false, + }) + } else { + None } } } -/// Try deserialize [`WorldStateView`] from snapshot file +/// Try to deserialize [`WorldStateView`] from a snapshot file. /// /// # Errors /// - IO errors /// - Deserialization errors pub fn try_read_snapshot( - snapshot_dir: impl AsRef, + store_dir: impl AsRef, kura: &Arc, query_handle: LiveQueryStoreHandle, BlockCount(block_count): BlockCount, -) -> Result { +) -> Result { let mut bytes = Vec::new(); - let path = snapshot_dir.as_ref().join(SNAPSHOT_FILE_NAME); - let mut file = std::fs::OpenOptions::new() - .read(true) - .open(&path) - .map_err(|err| Error::IO(err, path.clone()))?; + let path = store_dir.as_ref().join(SNAPSHOT_FILE_NAME); + let mut file = match std::fs::OpenOptions::new().read(true).open(&path) { + Ok(file) => file, + Err(err) => { + return if err.kind() == std::io::ErrorKind::NotFound { + Err(TryReadError::NotFound) + } else { + Err(TryReadError::IO(err, path.clone())) + } + } + }; file.read_to_end(&mut bytes) - .map_err(|err| Error::IO(err, path.clone()))?; + .map_err(|err| TryReadError::IO(err, path.clone()))?; let mut deserializer = serde_json::Deserializer::from_slice(&bytes); let seed = KuraSeed { kura: Arc::clone(kura), @@ -175,7 +159,7 @@ pub fn try_read_snapshot( let wsv = seed.deserialize(&mut deserializer)?; let snapshot_height = wsv.block_hashes.len(); if snapshot_height > block_count { - return Err(Error::MismatchedHeight { + return Err(TryReadError::MismatchedHeight { snapshot_height, kura_height: block_count, }); @@ -186,7 +170,7 @@ pub fn try_read_snapshot( .expect("Kura has height at least as large as wsv_height"); let snapshot_block_hash = wsv.block_hashes[height - 1]; if kura_block_hash != snapshot_block_hash { - return Err(Error::MismatchedHash { + return Err(TryReadError::MismatchedHash { height, snapshot_block_hash, kura_block_hash, @@ -196,18 +180,47 @@ pub fn try_read_snapshot( Ok(wsv) } -/// Error variants for snapshot reading/writing logic +/// Serialize and write snapshot to file, +/// overwriting any previously stored data. +/// +/// # Errors +/// - IO errors +/// - Serialization errors +fn try_write_snapshot( + wsv: &WorldStateView, + store_dir: impl AsRef, +) -> Result<(), TryWriteError> { + std::fs::create_dir_all(store_dir.as_ref()) + .map_err(|err| TryWriteError::IO(err, store_dir.as_ref().to_path_buf()))?; + let path_to_file = store_dir.as_ref().join(SNAPSHOT_FILE_NAME); + let path_to_tmp_file = store_dir.as_ref().join(SNAPSHOT_TMP_FILE_NAME); + let file = std::fs::OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(&path_to_tmp_file) + .map_err(|err| TryWriteError::IO(err, path_to_tmp_file.clone()))?; + let mut serializer = serde_json::Serializer::new(file); + wsv.serialize(&mut serializer)?; + std::fs::rename(path_to_tmp_file, &path_to_file) + .map_err(|err| TryWriteError::IO(err, path_to_file.clone()))?; + Ok(()) +} + +/// Error variants for snapshot reading #[derive(thiserror::Error, Debug, displaydoc::Display)] -pub enum Error { +pub enum TryReadError { + /// The snapshot was not found + NotFound, /// Failed reading/writing {1:?} from disk IO(#[source] std::io::Error, PathBuf), - /// Error (de)serializing [`WorldStateView`] snapshot + /// Error (de)serializing World State View snapshot Serialization(#[from] serde_json::Error), /// Snapshot is in a non-consistent state. Snapshot has greater height ({snapshot_height}) than kura block store ({kura_height}) MismatchedHeight { - /// Amount of block hashes stored by snapshot + /// The amount of block hashes stored by snapshot snapshot_height: usize, - /// Amount of blocks stored by [`Kura`] + /// The amount of blocks stored by [`Kura`] kura_height: usize, }, /// Snapshot is in a non-consistent state. Hash of the block at height {height} is different between snapshot ({snapshot_block_hash}) and kura ({kura_block_hash}) @@ -220,3 +233,106 @@ pub enum Error { kura_block_hash: HashOf, }, } + +/// Error variants for snapshot writing +#[derive(thiserror::Error, Debug, displaydoc::Display)] +enum TryWriteError { + /// Failed reading/writing {1:?} from disk + IO(#[source] std::io::Error, PathBuf), + /// Error (de)serializing World State View snapshot + Serialization(#[from] serde_json::Error), +} + +#[cfg(test)] +mod tests { + use std::{fs::File, io::Write}; + + use iroha_crypto::KeyPair; + use tempfile::tempdir; + use tokio::test; + + use super::*; + use crate::query::store::LiveQueryStore; + + fn wsv_factory() -> WorldStateView { + let alice_key = KeyPair::random(); + let kura = Kura::blank_kura_for_testing(); + let query_handle = LiveQueryStore::test().start(); + WorldStateView::new( + crate::queue::tests::world_with_test_domains([alice_key.public_key().clone()]), + kura, + query_handle, + ) + } + + #[test] + async fn creates_all_dirs_while_writing_snapshots() { + let tmp_root = tempdir().unwrap(); + let snapshot_store_dir = tmp_root.path().join("path/to/snapshot/dir"); + let wsv = wsv_factory(); + + try_write_snapshot(&wsv, &snapshot_store_dir).unwrap(); + + assert!(Path::exists(snapshot_store_dir.as_path())) + } + + #[test] + async fn can_read_snapshot_after_writing() { + let tmp_root = tempdir().unwrap(); + let store_dir = tmp_root.path().join("snapshot"); + let wsv = wsv_factory(); + + try_write_snapshot(&wsv, &store_dir).unwrap(); + let _wsv = try_read_snapshot( + &store_dir, + &Kura::blank_kura_for_testing(), + LiveQueryStore::test().start(), + BlockCount(usize::try_from(wsv.height()).unwrap()), + ) + .unwrap(); + } + + #[test] + async fn cannot_find_snapshot_on_read_is_not_found() { + let tmp_root = tempdir().unwrap(); + let store_dir = tmp_root.path().join("snapshot"); + + let Err(error) = try_read_snapshot( + store_dir, + &Kura::blank_kura_for_testing(), + LiveQueryStore::test().start(), + BlockCount(15), + ) else { + panic!("should not be ok") + }; + + assert!(matches!(error, TryReadError::NotFound)); + } + + #[test] + async fn cannot_parse_snapshot_on_read_is_error() { + let tmp_root = tempdir().unwrap(); + let store_dir = tmp_root.path().join("snapshot"); + std::fs::create_dir(&store_dir).unwrap(); + { + let mut file = File::create(store_dir.join(SNAPSHOT_FILE_NAME)).unwrap(); + file.write_all(&[1, 4, 1, 2, 3, 4, 1, 4]).unwrap(); + } + + let Err(error) = try_read_snapshot( + &store_dir, + &Kura::blank_kura_for_testing(), + LiveQueryStore::test().start(), + BlockCount(15), + ) else { + panic!("should not be ok") + }; + + assert_eq!( + format!("{error}"), + "Error (de)serializing World State View snapshot" + ); + } + + // TODO: test block count comparison +}