Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Parachains db column "migration" (#5797)
Browse files Browse the repository at this point in the history
* Column migration for parityDB

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* fmt

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* fix

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Remove columns

* warn

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* bump paritydb

* use clear_column

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* fix

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* logs

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* paritydb 0.3.16

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Update node/service/Cargo.toml

Co-authored-by: Andronik <write@reusable.software>

* ParityDB versioning

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* cargo lock

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* review + proper version constants

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Add test

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

Co-authored-by: Andronik <write@reusable.software>
  • Loading branch information
sandreim and ordian authored Jul 27, 2022
1 parent 4a812f5 commit e3ea24b
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 41 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion node/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ serde_json = "1.0.81"
thiserror = "1.0.31"
kvdb = "0.11.0"
kvdb-rocksdb = { version = "0.15.2", optional = true }
parity-db = { version = "0.3.13", optional = true }
parity-db = { version = "0.3.16", optional = true }
async-trait = "0.1.53"
lru = "0.7"

Expand Down Expand Up @@ -128,6 +128,7 @@ polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" }
env_logger = "0.9.0"
log = "0.4.17"
assert_matches = "1.5.0"
tempfile = "3.2"

[features]
default = ["db", "full-node", "polkadot-native"]
Expand Down
65 changes: 37 additions & 28 deletions node/service/src/parachains_db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,25 @@ use {
#[cfg(feature = "full-node")]
mod upgrade;

const LOG_TARGET: &str = "parachain::db";

#[cfg(any(test, feature = "full-node"))]
pub(crate) mod columns {
pub mod v0 {
pub const NUM_COLUMNS: u32 = 3;
}
pub const NUM_COLUMNS: u32 = 5;

pub const COL_AVAILABILITY_DATA: u32 = 0;
pub const COL_AVAILABILITY_META: u32 = 1;
pub const COL_APPROVAL_DATA: u32 = 2;
pub const COL_CHAIN_SELECTION_DATA: u32 = 3;
pub const COL_DISPUTE_COORDINATOR_DATA: u32 = 4;
pub const ORDERED_COL: &[u32] =
&[COL_AVAILABILITY_META, COL_CHAIN_SELECTION_DATA, COL_DISPUTE_COORDINATOR_DATA];

pub mod v1 {
pub const NUM_COLUMNS: u32 = 5;

pub const COL_AVAILABILITY_DATA: u32 = 0;
pub const COL_AVAILABILITY_META: u32 = 1;
pub const COL_APPROVAL_DATA: u32 = 2;
pub const COL_CHAIN_SELECTION_DATA: u32 = 3;
pub const COL_DISPUTE_COORDINATOR_DATA: u32 = 4;
pub const ORDERED_COL: &[u32] =
&[COL_AVAILABILITY_META, COL_CHAIN_SELECTION_DATA, COL_DISPUTE_COORDINATOR_DATA];
}
}

/// Columns used by different subsystems.
Expand All @@ -56,13 +61,19 @@ pub struct ColumnsConfig {
/// The real columns used by the parachains DB.
#[cfg(any(test, feature = "full-node"))]
pub const REAL_COLUMNS: ColumnsConfig = ColumnsConfig {
col_availability_data: columns::COL_AVAILABILITY_DATA,
col_availability_meta: columns::COL_AVAILABILITY_META,
col_approval_data: columns::COL_APPROVAL_DATA,
col_chain_selection_data: columns::COL_CHAIN_SELECTION_DATA,
col_dispute_coordinator_data: columns::COL_DISPUTE_COORDINATOR_DATA,
col_availability_data: columns::v1::COL_AVAILABILITY_DATA,
col_availability_meta: columns::v1::COL_AVAILABILITY_META,
col_approval_data: columns::v1::COL_APPROVAL_DATA,
col_chain_selection_data: columns::v1::COL_CHAIN_SELECTION_DATA,
col_dispute_coordinator_data: columns::v1::COL_DISPUTE_COORDINATOR_DATA,
};

#[derive(PartialEq)]
pub(crate) enum DatabaseKind {
ParityDB,
RocksDB,
}

/// The cache size for each column, in megabytes.
#[derive(Debug, Clone)]
pub struct CacheSizes {
Expand Down Expand Up @@ -95,27 +106,29 @@ pub fn open_creating_rocksdb(

let path = root.join("parachains").join("db");

let mut db_config = DatabaseConfig::with_columns(columns::NUM_COLUMNS);
let mut db_config = DatabaseConfig::with_columns(columns::v1::NUM_COLUMNS);

let _ = db_config
.memory_budget
.insert(columns::COL_AVAILABILITY_DATA, cache_sizes.availability_data);
.insert(columns::v1::COL_AVAILABILITY_DATA, cache_sizes.availability_data);
let _ = db_config
.memory_budget
.insert(columns::COL_AVAILABILITY_META, cache_sizes.availability_meta);
.insert(columns::v1::COL_AVAILABILITY_META, cache_sizes.availability_meta);
let _ = db_config
.memory_budget
.insert(columns::COL_APPROVAL_DATA, cache_sizes.approval_data);
.insert(columns::v1::COL_APPROVAL_DATA, cache_sizes.approval_data);

let path_str = path
.to_str()
.ok_or_else(|| other_io_error(format!("Bad database path: {:?}", path)))?;

std::fs::create_dir_all(&path_str)?;
upgrade::try_upgrade_db(&path)?;
upgrade::try_upgrade_db(&path, DatabaseKind::RocksDB)?;
let db = Database::open(&db_config, &path_str)?;
let db =
polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, columns::ORDERED_COL);
let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(
db,
columns::v1::ORDERED_COL,
);

Ok(Arc::new(db))
}
Expand All @@ -132,18 +145,14 @@ pub fn open_creating_paritydb(
.ok_or_else(|| other_io_error(format!("Bad database path: {:?}", path)))?;

std::fs::create_dir_all(&path_str)?;
upgrade::try_upgrade_db(&path, DatabaseKind::ParityDB)?;

let mut options = parity_db::Options::with_columns(&path, columns::NUM_COLUMNS as u8);
for i in columns::ORDERED_COL {
options.columns[*i as usize].btree_index = true;
}

let db = parity_db::Db::open_or_create(&options)
let db = parity_db::Db::open_or_create(&upgrade::paritydb_version_1_config(&path))
.map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))?;

let db = polkadot_node_subsystem_util::database::paritydb_impl::DbAdapter::new(
db,
columns::ORDERED_COL,
columns::v1::ORDERED_COL,
);
Ok(Arc::new(db))
}
174 changes: 164 additions & 10 deletions node/service/src/parachains_db/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#![cfg(feature = "full-node")]

use super::{columns, other_io_error, DatabaseKind, LOG_TARGET};
use std::{
fs, io,
path::{Path, PathBuf},
Expand Down Expand Up @@ -49,26 +50,38 @@ impl From<Error> for io::Error {
}

/// Try upgrading parachain's database to the current version.
pub fn try_upgrade_db(db_path: &Path) -> Result<(), Error> {
pub(crate) fn try_upgrade_db(db_path: &Path, db_kind: DatabaseKind) -> Result<(), Error> {
let is_empty = db_path.read_dir().map_or(true, |mut d| d.next().is_none());
if !is_empty {
match current_version(db_path)? {
0 => migrate_from_version_0_to_1(db_path)?,
CURRENT_VERSION => (),
v => return Err(Error::FutureVersion { current: CURRENT_VERSION, got: v }),
match get_db_version(db_path)? {
// 0 -> 1 migration
Some(0) => migrate_from_version_0_to_1(db_path, db_kind)?,
// Already at current version, do nothing.
Some(CURRENT_VERSION) => (),
// This is an arbitrary future version, we don't handle it.
Some(v) => return Err(Error::FutureVersion { current: CURRENT_VERSION, got: v }),
// No version file. For `RocksDB` we dont need to do anything.
None if db_kind == DatabaseKind::RocksDB => (),
// No version file. `ParityDB` did not previously have a version defined.
// We handle this as a `0 -> 1` migration.
None if db_kind == DatabaseKind::ParityDB =>
migrate_from_version_0_to_1(db_path, db_kind)?,
None => unreachable!(),
}
}

update_version(db_path)
}

/// Reads current database version from the file at given path.
/// If the file does not exist, assumes the current version.
fn current_version(path: &Path) -> Result<Version, Error> {
/// If the file does not exist returns `None`, otherwise the version stored in the file.
fn get_db_version(path: &Path) -> Result<Option<Version>, Error> {
match fs::read_to_string(version_file_path(path)) {
Err(ref err) if err.kind() == io::ErrorKind::NotFound => Ok(CURRENT_VERSION),
Err(ref err) if err.kind() == io::ErrorKind::NotFound => Ok(None),
Err(err) => Err(err.into()),
Ok(content) => u32::from_str(&content).map_err(|_| Error::CorruptedVersionFile),
Ok(content) => u32::from_str(&content)
.map(|v| Some(v))
.map_err(|_| Error::CorruptedVersionFile),
}
}

Expand All @@ -86,9 +99,22 @@ fn version_file_path(path: &Path) -> PathBuf {
file_path
}

fn migrate_from_version_0_to_1(path: &Path, db_kind: DatabaseKind) -> Result<(), Error> {
gum::info!(target: LOG_TARGET, "Migrating parachains db from version 0 to version 1 ...");

match db_kind {
DatabaseKind::ParityDB => paritydb_migrate_from_version_0_to_1(path),
DatabaseKind::RocksDB => rocksdb_migrate_from_version_0_to_1(path),
}
.and_then(|result| {
gum::info!(target: LOG_TARGET, "Migration complete! ");
Ok(result)
})
}

/// Migration from version 0 to version 1:
/// * the number of columns has changed from 3 to 5;
fn migrate_from_version_0_to_1(path: &Path) -> Result<(), Error> {
fn rocksdb_migrate_from_version_0_to_1(path: &Path) -> Result<(), Error> {
use kvdb_rocksdb::{Database, DatabaseConfig};

let db_path = path
Expand All @@ -102,3 +128,131 @@ fn migrate_from_version_0_to_1(path: &Path) -> Result<(), Error> {

Ok(())
}

// This currently clears columns which had their configs altered between versions.
// The columns to be changed are constrained by the `allowed_columns` vector.
fn paritydb_fix_columns(
path: &Path,
options: parity_db::Options,
allowed_columns: Vec<u32>,
) -> io::Result<()> {
// Figure out which columns to delete. This will be determined by inspecting
// the metadata file.
if let Some(metadata) = parity_db::Options::load_metadata(&path)
.map_err(|e| other_io_error(format!("Error reading metadata {:?}", e)))?
{
let columns_to_clear = metadata
.columns
.into_iter()
.enumerate()
.filter(|(idx, _)| allowed_columns.contains(&(*idx as u32)))
.filter_map(|(idx, opts)| {
let changed = opts != options.columns[idx];
if changed {
gum::debug!(
target: LOG_TARGET,
"Column {} will be cleared. Old options: {:?}, New options: {:?}",
idx,
opts,
options.columns[idx]
);
Some(idx)
} else {
None
}
})
.collect::<Vec<_>>();

if columns_to_clear.len() > 0 {
gum::debug!(
target: LOG_TARGET,
"Database column changes detected, need to cleanup {} columns.",
columns_to_clear.len()
);
}

for column in columns_to_clear {
gum::debug!(target: LOG_TARGET, "Clearing column {}", column,);
parity_db::clear_column(path, column.try_into().expect("Invalid column ID"))
.map_err(|e| other_io_error(format!("Error clearing column {:?}", e)))?;
}

// Write the updated column options.
options
.write_metadata(path, &metadata.salt)
.map_err(|e| other_io_error(format!("Error writing metadata {:?}", e)))?;
}

Ok(())
}

/// Database configuration for version 1.
pub(crate) fn paritydb_version_1_config(path: &Path) -> parity_db::Options {
let mut options =
parity_db::Options::with_columns(&path, super::columns::v1::NUM_COLUMNS as u8);
for i in columns::v1::ORDERED_COL {
options.columns[*i as usize].btree_index = true;
}

options
}

/// Database configuration for version 0. This is useful just for testing.
#[cfg(test)]
pub(crate) fn paritydb_version_0_config(path: &Path) -> parity_db::Options {
let mut options =
parity_db::Options::with_columns(&path, super::columns::v1::NUM_COLUMNS as u8);
options.columns[super::columns::v1::COL_AVAILABILITY_META as usize].btree_index = true;
options.columns[super::columns::v1::COL_CHAIN_SELECTION_DATA as usize].btree_index = true;

options
}

/// Migration from version 0 to version 1.
/// Cases covered:
/// - upgrading from v0.9.23 or earlier -> the `dispute coordinator column` was changed
/// - upgrading from v0.9.24+ -> this is a no op assuming the DB has been manually fixed as per
/// release notes
fn paritydb_migrate_from_version_0_to_1(path: &Path) -> Result<(), Error> {
// Delete the `dispute coordinator` column if needed (if column configuration is changed).
paritydb_fix_columns(
path,
paritydb_version_1_config(path),
vec![super::columns::v1::COL_DISPUTE_COORDINATOR_DATA],
)?;

Ok(())
}

#[cfg(test)]
mod tests {
#[test]
fn test_paritydb_migrate_0_1() {
use super::{columns::v1::*, *};
use parity_db::Db;

let db_dir = tempfile::tempdir().unwrap();
let path = db_dir.path();
{
let db = Db::open_or_create(&paritydb_version_0_config(&path)).unwrap();

db.commit(vec![
(COL_DISPUTE_COORDINATOR_DATA as u8, b"1234".to_vec(), Some(b"somevalue".to_vec())),
(COL_AVAILABILITY_META as u8, b"5678".to_vec(), Some(b"somevalue".to_vec())),
])
.unwrap();
}

try_upgrade_db(&path, DatabaseKind::ParityDB).unwrap();

let db = Db::open(&paritydb_version_1_config(&path)).unwrap();
assert_eq!(
db.get(super::columns::v1::COL_DISPUTE_COORDINATOR_DATA as u8, b"1234").unwrap(),
None
);
assert_eq!(
db.get(super::columns::v1::COL_AVAILABILITY_META as u8, b"5678").unwrap(),
Some("somevalue".as_bytes().to_vec())
);
}
}

0 comments on commit e3ea24b

Please sign in to comment.