Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

drop column primitive #210

Merged
merged 6 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
compress::Compress,
db::{check::CheckDisplay, Operation, RcValue},
display::hex,
error::{Error, Result},
error::{try_io, Error, Result},
index::{Address, IndexTable, PlanOutcome, TableId as IndexTableId},
log::{Log, LogAction, LogOverlays, LogQuery, LogReader, LogWriter},
options::{ColumnOptions, Metadata, Options, DEFAULT_COMPRESSION_THRESHOLD},
Expand All @@ -20,6 +20,7 @@ use crate::{
};
use std::{
collections::VecDeque,
path::PathBuf,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
Expand Down Expand Up @@ -94,7 +95,7 @@ pub struct HashColumn {
col: ColId,
tables: RwLock<Tables>,
reindex: RwLock<Reindex>,
path: std::path::PathBuf,
path: PathBuf,
preimage: bool,
uniform_keys: bool,
collect_stats: bool,
Expand Down Expand Up @@ -325,7 +326,7 @@ impl Column {
}

fn open_table(
path: Arc<std::path::PathBuf>,
path: Arc<PathBuf>,
col: ColId,
tier: u8,
options: &ColumnOptions,
Expand All @@ -335,6 +336,29 @@ impl Column {
let entry_size = SIZES.get(tier as usize).cloned();
ValueTable::open(path, id, entry_size, options, db_version)
}

pub(crate) fn drop_files(column: ColId, path: PathBuf) -> Result<()> {
// It is not specified how read_dir behaves when deleting and iterating in the same loop
// We collect a list of paths to be deleted first.
let mut to_delete = Vec::new();
for entry in try_io!(std::fs::read_dir(&path)) {
let entry = try_io!(entry);
if let Some(file) = entry.path().file_name().and_then(|f| f.to_str()) {
if crate::index::TableId::is_file_name(column, file) ||
crate::table::TableId::is_file_name(column, file)
{
to_delete.push(PathBuf::from(file));
}
}
}

for file in to_delete {
let mut path = path.clone();
path.push(file);
try_io!(std::fs::remove_file(path));
}
Ok(())
}
}

impl HashColumn {
Expand Down
138 changes: 127 additions & 11 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ impl DbInner {
Ok(())
}

fn replay_all_logs(&mut self) -> Result<()> {
fn replay_all_logs(&self) -> Result<()> {
while let Some(id) = self.log.replay_next()? {
log::debug!(target: "parity-db", "Replaying database log {}", id);
while self.enact_logs(true)? {}
Expand Down Expand Up @@ -920,7 +920,7 @@ impl Db {

fn open_inner(options: &Options, opening_mode: OpeningMode) -> Result<Db> {
assert!(options.is_valid());
let mut db = DbInner::open(options, opening_mode)?;
let db = DbInner::open(options, opening_mode)?;
// This needs to be call before log thread: so first reindexing
// will run in correct state.
if let Err(e) = db.replay_all_logs() {
Expand Down Expand Up @@ -1124,24 +1124,70 @@ impl Db {
self.inner.stats()
}

/// Add a new column with options specified by `new_column_options`.
pub fn add_column(options: &mut Options, new_column_options: ColumnOptions) -> Result<()> {
// We open the DB before to check metadata validity and make sure there are no pending WAL
// logs.
// We open the DB before to check metadata validity and make sure there are no pending WAL
// logs.
fn precheck_column_operation(options: &mut Options) -> Result<[u8; 32]> {
let db = Db::open(options)?;
let salt = db.inner.options.salt;
drop(db);
Ok(salt.expect("`salt` is always `Some` after opening the DB; qed"))
}

/// Add a new column with options specified by `new_column_options`.
pub fn add_column(options: &mut Options, new_column_options: ColumnOptions) -> Result<()> {
let salt = Self::precheck_column_operation(options)?;

options.columns.push(new_column_options);
options.write_metadata_with_version(
&options.path,
&salt.expect("`salt` is always `Some` after opening the DB; qed"),
Some(CURRENT_VERSION),
)?;
options.write_metadata_with_version(&options.path, &salt, Some(CURRENT_VERSION))?;

Ok(())
}

/// Remove last column from the database.
/// Db must be close when called.
pub fn drop_last_column(options: &mut Options) -> Result<()> {
let salt = Self::precheck_column_operation(options)?;
let nb_column = options.columns.len();
if nb_column == 0 {
return Ok(())
}
let index = options.columns.len() - 1;
Self::remove_column_files(options, index as u8)?;
options.columns.pop();
options.write_metadata(&options.path, &salt)?;
Ok(())
}

/// Truncate a column from the database, optionally changing its options.
/// Db must be close when called.
pub fn reset_column(
options: &mut Options,
index: u8,
new_options: Option<ColumnOptions>,
) -> Result<()> {
let salt = Self::precheck_column_operation(options)?;
Self::remove_column_files(options, index)?;

if let Some(new_options) = new_options {
options.columns[index as usize] = new_options;
options.write_metadata(&options.path, &salt)?;
}

Ok(())
}

fn remove_column_files(options: &mut Options, index: u8) -> Result<()> {
if index as usize >= options.columns.len() {
return Err(Error::IncompatibleColumnConfig {
id: index,
reason: "Column not found".to_string(),
})
}

Column::drop_files(index, options.path.clone())?;
Ok(())
}

#[cfg(feature = "instrumentation")]
pub fn process_reindex(&self) -> Result<()> {
self.inner.process_reindex()?;
Expand Down Expand Up @@ -1175,6 +1221,12 @@ impl Db {

impl Drop for Db {
fn drop(&mut self) {
self.drop_inner()
}
}

impl Db {
fn drop_inner(&mut self) {
self.inner.shutdown();
if let Some(t) = self.log_thread.take() {
if let Err(e) = t.join() {
Expand Down Expand Up @@ -2622,4 +2674,68 @@ mod tests {
assert_eq!(db.inner.columns[0].index_bits(), Some(17));
}
}

#[test]
fn test_remove_column() {
let tmp = tempdir().unwrap();
let db_test_file = EnableCommitPipelineStages::DbFile;
let mut options_db_files = db_test_file.options(tmp.path(), 2);
options_db_files.salt = Some(options_db_files.salt.unwrap_or_default());
let mut options_std = EnableCommitPipelineStages::Standard.options(tmp.path(), 2);
options_std.salt = options_db_files.salt.clone();

let db = Db::open_inner(&options_db_files, OpeningMode::Create).unwrap();

let payload: Vec<(u8, _, _)> = (0u16..100)
.map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
.collect();

db.commit(payload.clone()).unwrap();

db_test_file.run_stages(&db);
drop(db);

let db = Db::open_inner(&options_std, OpeningMode::Write).unwrap();
for (col, key, value) in payload.iter() {
assert_eq!(db.get(*col, key).unwrap().as_ref(), value.as_ref());
}
drop(db);
Db::reset_column(&mut options_db_files, 1, None).unwrap();

let db = Db::open_inner(&options_db_files, OpeningMode::Write).unwrap();
for (col, key, _value) in payload.iter() {
assert_eq!(db.get(*col, key).unwrap(), None);
}

let payload: Vec<(u8, _, _)> = (0u16..10)
.map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
.collect();

db.commit(payload.clone()).unwrap();

db_test_file.run_stages(&db);
drop(db);

let db = Db::open_inner(&options_std, OpeningMode::Write).unwrap();
let payload: Vec<(u8, _, _)> = (10u16..100)
.map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
.collect();

db.commit(payload.clone()).unwrap();
assert!(db.iter(1).is_err());

drop(db);

let mut col_option = options_std.columns[1].clone();
col_option.btree_index = true;
Db::reset_column(&mut options_std, 1, Some(col_option)).unwrap();

let db = Db::open_inner(&options_std, OpeningMode::Write).unwrap();
let payload: Vec<(u8, _, _)> = (0u16..10)
.map(|i| (1, i.to_le_bytes().to_vec(), Some(i.to_be_bytes().to_vec())))
.collect();

db.commit(payload.clone()).unwrap();
assert!(db.iter(1).is_ok());
}
}
2 changes: 1 addition & 1 deletion src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ impl Log {
Ok(false)
}

pub fn replay_next(&mut self) -> Result<Option<u32>> {
pub fn replay_next(&self) -> Result<Option<u32>> {
let mut reading = self.reading.write();
{
if let Some(reading) = reading.take() {
Expand Down
29 changes: 2 additions & 27 deletions src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
Error, Result,
};
/// Database migration.
use std::path::{Path, PathBuf};
use std::path::Path;

const COMMIT_SIZE: usize = 10240;
const OVERWRITE_TMP_PATH: &str = "to_revert_overwrite";
Expand Down Expand Up @@ -163,33 +163,8 @@ pub fn clear_column(path: &Path, column: ColId) -> Result<()> {
return Err(Error::Migration("Invalid column index".into()))
}

// Validate the database by opening. This also makes sure all the logs are enacted,
// so that after deleting a column there are no leftover commits that may write to it.
let mut options = Options::with_columns(path, meta.columns.len() as u8);
options.columns = meta.columns;
options.salt = Some(meta.salt);
let _db = Db::open(&options)?;
drop(_db);
crate::column::Column::drop_files(column, path.to_path_buf())?;

// It is not specified how read_dir behaves when deleting and iterating in the same loop
// We collect a list of paths to be deleted first.
let mut to_delete = Vec::new();
for entry in try_io!(std::fs::read_dir(path)) {
let entry = try_io!(entry);
if let Some(file) = entry.path().file_name().and_then(|f| f.to_str()) {
if crate::index::TableId::is_file_name(column, file) ||
crate::table::TableId::is_file_name(column, file)
{
to_delete.push(PathBuf::from(file));
}
}
}

for file in to_delete {
let mut path = path.to_path_buf();
path.push(file);
try_io!(std::fs::remove_file(path));
}
Ok(())
}

Expand Down