Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Improve handling of RocksDB corruption (#7630)
Browse files Browse the repository at this point in the history
* kvdb-rocksdb: update rust-rocksdb version

* kvdb-rocksdb: mark corruptions and attempt repair on db open

* kvdb-rocksdb: better corruption detection on open

* kvdb-rocksdb: add corruption_file_name const

* kvdb-rocksdb: rename mark_corruption to check_for_corruption
  • Loading branch information
andresilva authored and debris committed Jan 22, 2018
1 parent 97ed569 commit 82cb1e5
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 12 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

56 changes: 46 additions & 10 deletions util/kvdb-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use std::cmp;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::path::{PathBuf, Path};
use std::{mem, fs, io};
use std::{fs, io, mem, result};

use parking_lot::{Mutex, MutexGuard, RwLock};
use rocksdb::{
Expand Down Expand Up @@ -257,7 +257,25 @@ pub struct Database {
flushing_lock: Mutex<bool>,
}

#[inline]
fn check_for_corruption<T, P: AsRef<Path>>(path: P, res: result::Result<T, String>) -> result::Result<T, String> {
if let Err(ref s) = res {
if s.starts_with("Corruption:") {
warn!("DB corrupted: {}. Repair will be triggered on next restart", s);
let _ = fs::File::create(path.as_ref().join(Database::CORRUPTION_FILE_NAME));
}
}

res
}

fn is_corrupted(s: &str) -> bool {
s.starts_with("Corruption:") || s.starts_with("Invalid argument: You have to open all column families")
}

impl Database {
const CORRUPTION_FILE_NAME: &'static str = "CORRUPTED";

/// Open database with default settings.
pub fn open_default(path: &str) -> Result<Database> {
Database::open(&DatabaseConfig::default(), path)
Expand Down Expand Up @@ -287,6 +305,14 @@ impl Database {
block_opts.set_cache(cache);
}

// attempt database repair if it has been previously marked as corrupted
let db_corrupted = Path::new(path).join(Database::CORRUPTION_FILE_NAME);
if db_corrupted.exists() {
warn!("DB has been previously marked as corrupted, attempting repair");
DB::repair(&opts, path)?;
fs::remove_file(db_corrupted)?;
}

let columns = config.columns.unwrap_or(0) as usize;

let mut cf_options = Vec::with_capacity(columns);
Expand All @@ -306,12 +332,11 @@ impl Database {

let mut cfs: Vec<Column> = Vec::new();
let db = match config.columns {
Some(columns) => {
Some(_) => {
match DB::open_cf(&opts, path, &cfnames, &cf_options) {
Ok(db) => {
cfs = cfnames.iter().map(|n| db.cf_handle(n)
.expect("rocksdb opens a cf_handle for each cfname; qed")).collect();
assert!(cfs.len() == columns as usize);
Ok(db)
}
Err(_) => {
Expand All @@ -321,7 +346,7 @@ impl Database {
cfs = cfnames.iter().enumerate().map(|(i, n)| db.create_cf(n, &cf_options[i])).collect::<::std::result::Result<_, _>>()?;
Ok(db)
},
err @ Err(_) => err,
err => err,
}
}
}
Expand All @@ -331,14 +356,18 @@ impl Database {

let db = match db {
Ok(db) => db,
Err(ref s) if s.starts_with("Corruption:") => {
info!("{}", s);
info!("Attempting DB repair for {}", path);
Err(ref s) if is_corrupted(s) => {
warn!("DB corrupted: {}, attempting repair", s);
DB::repair(&opts, path)?;

match cfnames.is_empty() {
true => DB::open(&opts, path)?,
false => DB::open_cf(&opts, path, &cfnames, &cf_options)?
false => {
let db = DB::open_cf(&opts, path, &cfnames, &cf_options)?;
cfs = cfnames.iter().map(|n| db.cf_handle(n)
.expect("rocksdb opens a cf_handle for each cfname; qed")).collect();
db
},
}
},
Err(s) => { return Err(s.into()); }
Expand Down Expand Up @@ -425,7 +454,11 @@ impl Database {
}
}
}
db.write_opt(batch, &self.write_opts)?;

check_for_corruption(
&self.path,
db.write_opt(batch, &self.write_opts))?;

for column in self.flushing.write().iter_mut() {
column.clear();
column.shrink_to_fit();
Expand Down Expand Up @@ -471,7 +504,10 @@ impl Database {
},
}
}
db.write_opt(batch, &self.write_opts).map_err(Into::into)

check_for_corruption(
&self.path,
db.write_opt(batch, &self.write_opts)).map_err(Into::into)
},
None => Err("Database is closed".into())
}
Expand Down

0 comments on commit 82cb1e5

Please sign in to comment.