diff --git a/Cargo.lock b/Cargo.lock index d7d1baa2730d..7db07970d585 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5660,9 +5660,9 @@ dependencies = [ [[package]] name = "parity-db" -version = "0.3.13" +version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55a7901b85874402471e131de3332dde0e51f38432c69a3853627c8e25433048" +checksum = "2bb474d0ed0836e185cb998a6b140ed1073d1fbf27d690ecf9ede8030289382c" dependencies = [ "blake2-rfc", "crc32fast", @@ -7228,6 +7228,7 @@ dependencies = [ "sp-transaction-pool", "sp-trie", "substrate-prometheus-endpoint", + "tempfile", "thiserror", "tracing-gum", "westend-runtime", diff --git a/node/service/Cargo.toml b/node/service/Cargo.toml index 59bf3b5204a8..f12f4568e27b 100644 --- a/node/service/Cargo.toml +++ b/node/service/Cargo.toml @@ -69,7 +69,7 @@ serde_json = "1.0.81" thiserror = "1.0.31" kvdb = "0.11.0" kvdb-rocksdb = { version = "0.15.2", optional = true } -parity-db = { version = "0.3.13", optional = true } +parity-db = { version = "0.3.16", optional = true } async-trait = "0.1.53" lru = "0.7" @@ -128,6 +128,7 @@ polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" } env_logger = "0.9.0" log = "0.4.17" assert_matches = "1.5.0" +tempfile = "3.2" [features] default = ["db", "full-node", "polkadot-native"] diff --git a/node/service/src/parachains_db/mod.rs b/node/service/src/parachains_db/mod.rs index f6f6864a0e78..de12a8ac1a32 100644 --- a/node/service/src/parachains_db/mod.rs +++ b/node/service/src/parachains_db/mod.rs @@ -21,20 +21,25 @@ use { #[cfg(feature = "full-node")] mod upgrade; +const LOG_TARGET: &str = "parachain::db"; + #[cfg(any(test, feature = "full-node"))] pub(crate) mod columns { pub mod v0 { pub const NUM_COLUMNS: u32 = 3; } - pub const NUM_COLUMNS: u32 = 5; - - pub const COL_AVAILABILITY_DATA: u32 = 0; - pub const COL_AVAILABILITY_META: u32 = 1; - pub const COL_APPROVAL_DATA: u32 = 2; - pub const COL_CHAIN_SELECTION_DATA: u32 = 3; - pub const COL_DISPUTE_COORDINATOR_DATA: u32 = 4; - pub const ORDERED_COL: &[u32] = - &[COL_AVAILABILITY_META, COL_CHAIN_SELECTION_DATA, COL_DISPUTE_COORDINATOR_DATA]; + + pub mod v1 { + pub const NUM_COLUMNS: u32 = 5; + + pub const COL_AVAILABILITY_DATA: u32 = 0; + pub const COL_AVAILABILITY_META: u32 = 1; + pub const COL_APPROVAL_DATA: u32 = 2; + pub const COL_CHAIN_SELECTION_DATA: u32 = 3; + pub const COL_DISPUTE_COORDINATOR_DATA: u32 = 4; + pub const ORDERED_COL: &[u32] = + &[COL_AVAILABILITY_META, COL_CHAIN_SELECTION_DATA, COL_DISPUTE_COORDINATOR_DATA]; + } } /// Columns used by different subsystems. @@ -56,13 +61,19 @@ pub struct ColumnsConfig { /// The real columns used by the parachains DB. #[cfg(any(test, feature = "full-node"))] pub const REAL_COLUMNS: ColumnsConfig = ColumnsConfig { - col_availability_data: columns::COL_AVAILABILITY_DATA, - col_availability_meta: columns::COL_AVAILABILITY_META, - col_approval_data: columns::COL_APPROVAL_DATA, - col_chain_selection_data: columns::COL_CHAIN_SELECTION_DATA, - col_dispute_coordinator_data: columns::COL_DISPUTE_COORDINATOR_DATA, + col_availability_data: columns::v1::COL_AVAILABILITY_DATA, + col_availability_meta: columns::v1::COL_AVAILABILITY_META, + col_approval_data: columns::v1::COL_APPROVAL_DATA, + col_chain_selection_data: columns::v1::COL_CHAIN_SELECTION_DATA, + col_dispute_coordinator_data: columns::v1::COL_DISPUTE_COORDINATOR_DATA, }; +#[derive(PartialEq)] +pub(crate) enum DatabaseKind { + ParityDB, + RocksDB, +} + /// The cache size for each column, in megabytes. #[derive(Debug, Clone)] pub struct CacheSizes { @@ -95,27 +106,29 @@ pub fn open_creating_rocksdb( let path = root.join("parachains").join("db"); - let mut db_config = DatabaseConfig::with_columns(columns::NUM_COLUMNS); + let mut db_config = DatabaseConfig::with_columns(columns::v1::NUM_COLUMNS); let _ = db_config .memory_budget - .insert(columns::COL_AVAILABILITY_DATA, cache_sizes.availability_data); + .insert(columns::v1::COL_AVAILABILITY_DATA, cache_sizes.availability_data); let _ = db_config .memory_budget - .insert(columns::COL_AVAILABILITY_META, cache_sizes.availability_meta); + .insert(columns::v1::COL_AVAILABILITY_META, cache_sizes.availability_meta); let _ = db_config .memory_budget - .insert(columns::COL_APPROVAL_DATA, cache_sizes.approval_data); + .insert(columns::v1::COL_APPROVAL_DATA, cache_sizes.approval_data); let path_str = path .to_str() .ok_or_else(|| other_io_error(format!("Bad database path: {:?}", path)))?; std::fs::create_dir_all(&path_str)?; - upgrade::try_upgrade_db(&path)?; + upgrade::try_upgrade_db(&path, DatabaseKind::RocksDB)?; let db = Database::open(&db_config, &path_str)?; - let db = - polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, columns::ORDERED_COL); + let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new( + db, + columns::v1::ORDERED_COL, + ); Ok(Arc::new(db)) } @@ -132,18 +145,14 @@ pub fn open_creating_paritydb( .ok_or_else(|| other_io_error(format!("Bad database path: {:?}", path)))?; std::fs::create_dir_all(&path_str)?; + upgrade::try_upgrade_db(&path, DatabaseKind::ParityDB)?; - let mut options = parity_db::Options::with_columns(&path, columns::NUM_COLUMNS as u8); - for i in columns::ORDERED_COL { - options.columns[*i as usize].btree_index = true; - } - - let db = parity_db::Db::open_or_create(&options) + let db = parity_db::Db::open_or_create(&upgrade::paritydb_version_1_config(&path)) .map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))?; let db = polkadot_node_subsystem_util::database::paritydb_impl::DbAdapter::new( db, - columns::ORDERED_COL, + columns::v1::ORDERED_COL, ); Ok(Arc::new(db)) } diff --git a/node/service/src/parachains_db/upgrade.rs b/node/service/src/parachains_db/upgrade.rs index 0ba84103885f..ad995f41ed82 100644 --- a/node/service/src/parachains_db/upgrade.rs +++ b/node/service/src/parachains_db/upgrade.rs @@ -15,6 +15,7 @@ #![cfg(feature = "full-node")] +use super::{columns, other_io_error, DatabaseKind, LOG_TARGET}; use std::{ fs, io, path::{Path, PathBuf}, @@ -49,13 +50,23 @@ impl From for io::Error { } /// Try upgrading parachain's database to the current version. -pub fn try_upgrade_db(db_path: &Path) -> Result<(), Error> { +pub(crate) fn try_upgrade_db(db_path: &Path, db_kind: DatabaseKind) -> Result<(), Error> { let is_empty = db_path.read_dir().map_or(true, |mut d| d.next().is_none()); if !is_empty { - match current_version(db_path)? { - 0 => migrate_from_version_0_to_1(db_path)?, - CURRENT_VERSION => (), - v => return Err(Error::FutureVersion { current: CURRENT_VERSION, got: v }), + match get_db_version(db_path)? { + // 0 -> 1 migration + Some(0) => migrate_from_version_0_to_1(db_path, db_kind)?, + // Already at current version, do nothing. + Some(CURRENT_VERSION) => (), + // This is an arbitrary future version, we don't handle it. + Some(v) => return Err(Error::FutureVersion { current: CURRENT_VERSION, got: v }), + // No version file. For `RocksDB` we dont need to do anything. + None if db_kind == DatabaseKind::RocksDB => (), + // No version file. `ParityDB` did not previously have a version defined. + // We handle this as a `0 -> 1` migration. + None if db_kind == DatabaseKind::ParityDB => + migrate_from_version_0_to_1(db_path, db_kind)?, + None => unreachable!(), } } @@ -63,12 +74,14 @@ pub fn try_upgrade_db(db_path: &Path) -> Result<(), Error> { } /// Reads current database version from the file at given path. -/// If the file does not exist, assumes the current version. -fn current_version(path: &Path) -> Result { +/// If the file does not exist returns `None`, otherwise the version stored in the file. +fn get_db_version(path: &Path) -> Result, Error> { match fs::read_to_string(version_file_path(path)) { - Err(ref err) if err.kind() == io::ErrorKind::NotFound => Ok(CURRENT_VERSION), + Err(ref err) if err.kind() == io::ErrorKind::NotFound => Ok(None), Err(err) => Err(err.into()), - Ok(content) => u32::from_str(&content).map_err(|_| Error::CorruptedVersionFile), + Ok(content) => u32::from_str(&content) + .map(|v| Some(v)) + .map_err(|_| Error::CorruptedVersionFile), } } @@ -86,9 +99,22 @@ fn version_file_path(path: &Path) -> PathBuf { file_path } +fn migrate_from_version_0_to_1(path: &Path, db_kind: DatabaseKind) -> Result<(), Error> { + gum::info!(target: LOG_TARGET, "Migrating parachains db from version 0 to version 1 ..."); + + match db_kind { + DatabaseKind::ParityDB => paritydb_migrate_from_version_0_to_1(path), + DatabaseKind::RocksDB => rocksdb_migrate_from_version_0_to_1(path), + } + .and_then(|result| { + gum::info!(target: LOG_TARGET, "Migration complete! "); + Ok(result) + }) +} + /// Migration from version 0 to version 1: /// * the number of columns has changed from 3 to 5; -fn migrate_from_version_0_to_1(path: &Path) -> Result<(), Error> { +fn rocksdb_migrate_from_version_0_to_1(path: &Path) -> Result<(), Error> { use kvdb_rocksdb::{Database, DatabaseConfig}; let db_path = path @@ -102,3 +128,131 @@ fn migrate_from_version_0_to_1(path: &Path) -> Result<(), Error> { Ok(()) } + +// This currently clears columns which had their configs altered between versions. +// The columns to be changed are constrained by the `allowed_columns` vector. +fn paritydb_fix_columns( + path: &Path, + options: parity_db::Options, + allowed_columns: Vec, +) -> io::Result<()> { + // Figure out which columns to delete. This will be determined by inspecting + // the metadata file. + if let Some(metadata) = parity_db::Options::load_metadata(&path) + .map_err(|e| other_io_error(format!("Error reading metadata {:?}", e)))? + { + let columns_to_clear = metadata + .columns + .into_iter() + .enumerate() + .filter(|(idx, _)| allowed_columns.contains(&(*idx as u32))) + .filter_map(|(idx, opts)| { + let changed = opts != options.columns[idx]; + if changed { + gum::debug!( + target: LOG_TARGET, + "Column {} will be cleared. Old options: {:?}, New options: {:?}", + idx, + opts, + options.columns[idx] + ); + Some(idx) + } else { + None + } + }) + .collect::>(); + + if columns_to_clear.len() > 0 { + gum::debug!( + target: LOG_TARGET, + "Database column changes detected, need to cleanup {} columns.", + columns_to_clear.len() + ); + } + + for column in columns_to_clear { + gum::debug!(target: LOG_TARGET, "Clearing column {}", column,); + parity_db::clear_column(path, column.try_into().expect("Invalid column ID")) + .map_err(|e| other_io_error(format!("Error clearing column {:?}", e)))?; + } + + // Write the updated column options. + options + .write_metadata(path, &metadata.salt) + .map_err(|e| other_io_error(format!("Error writing metadata {:?}", e)))?; + } + + Ok(()) +} + +/// Database configuration for version 1. +pub(crate) fn paritydb_version_1_config(path: &Path) -> parity_db::Options { + let mut options = + parity_db::Options::with_columns(&path, super::columns::v1::NUM_COLUMNS as u8); + for i in columns::v1::ORDERED_COL { + options.columns[*i as usize].btree_index = true; + } + + options +} + +/// Database configuration for version 0. This is useful just for testing. +#[cfg(test)] +pub(crate) fn paritydb_version_0_config(path: &Path) -> parity_db::Options { + let mut options = + parity_db::Options::with_columns(&path, super::columns::v1::NUM_COLUMNS as u8); + options.columns[super::columns::v1::COL_AVAILABILITY_META as usize].btree_index = true; + options.columns[super::columns::v1::COL_CHAIN_SELECTION_DATA as usize].btree_index = true; + + options +} + +/// Migration from version 0 to version 1. +/// Cases covered: +/// - upgrading from v0.9.23 or earlier -> the `dispute coordinator column` was changed +/// - upgrading from v0.9.24+ -> this is a no op assuming the DB has been manually fixed as per +/// release notes +fn paritydb_migrate_from_version_0_to_1(path: &Path) -> Result<(), Error> { + // Delete the `dispute coordinator` column if needed (if column configuration is changed). + paritydb_fix_columns( + path, + paritydb_version_1_config(path), + vec![super::columns::v1::COL_DISPUTE_COORDINATOR_DATA], + )?; + + Ok(()) +} + +#[cfg(test)] +mod tests { + #[test] + fn test_paritydb_migrate_0_1() { + use super::{columns::v1::*, *}; + use parity_db::Db; + + let db_dir = tempfile::tempdir().unwrap(); + let path = db_dir.path(); + { + let db = Db::open_or_create(&paritydb_version_0_config(&path)).unwrap(); + + db.commit(vec![ + (COL_DISPUTE_COORDINATOR_DATA as u8, b"1234".to_vec(), Some(b"somevalue".to_vec())), + (COL_AVAILABILITY_META as u8, b"5678".to_vec(), Some(b"somevalue".to_vec())), + ]) + .unwrap(); + } + + try_upgrade_db(&path, DatabaseKind::ParityDB).unwrap(); + + let db = Db::open(&paritydb_version_1_config(&path)).unwrap(); + assert_eq!( + db.get(super::columns::v1::COL_DISPUTE_COORDINATOR_DATA as u8, b"1234").unwrap(), + None + ); + assert_eq!( + db.get(super::columns::v1::COL_AVAILABILITY_META as u8, b"5678").unwrap(), + Some("somevalue".as_bytes().to_vec()) + ); + } +}