Skip to content

Commit

Permalink
Add Rocksdb Secondary Instance Api (#384)
Browse files Browse the repository at this point in the history
* kvdb-rocksdb: update to new set_upper_bound API

* kvdb-rocksdb: update rocksdb to crates.io version

* kvdb-rocksdb: update the changelog

* Fix build? Set VM template.

* Fix build? correct image name

* Fix build? Maybe it's 2019?

* appveyor: try release build

* Revert "appveyor: try release build"

This reverts commit ace87ee.

* checkout rust-rocksdb/rust-rocksdb#412

* revert patch

* revert unrelated changes

* add open as secondary rocksdb api

* Update kvdb-rocksdb/src/lib.rs

Co-Authored-By: Andronik Ordian <write@reusable.software>

* add more information to secondary mode comment

* add function to catch up a secondary instance with a primary instance

* one more doc comment for more clarity

* style fixes

* Update kvdb-rocksdb/src/lib.rs

Co-Authored-By: David <dvdplm@gmail.com>

* Update kvdb-rocksdb/src/lib.rs

Co-Authored-By: David <dvdplm@gmail.com>

* change name of `secondary_mode` option to `secondary`

* Update kvdb-rocksdb/src/lib.rs

Co-Authored-By: David <dvdplm@gmail.com>

* fix some punctuation

* specify a different directory for secondary instance to store its logs

* Update kvdb-rocksdb/src/lib.rs

Co-authored-by: Andronik Ordian <write@reusable.software>

* remove catching up on primary db in test

* doc comment fixes

expand on what `try_catch_up_with_secondary` does, since
it may have some implications on the primary instance of rocksdb
according to L503-566 in `db/db_impl/db_impl_secondary.cc` of facebook/rocksdb

* remove wrong info about blocking primary instance

* more docs for catch-up-with-primary

* grammar

* make `max_open_files` comment clearer

Co-authored-by: Andronik Ordian <write@reusable.software>
Co-authored-by: David Palm <dvdplm@gmail.com>
  • Loading branch information
3 people authored May 1, 2020
1 parent 50c3dc2 commit 990d45d
Showing 1 changed file with 138 additions and 15 deletions.
153 changes: 138 additions & 15 deletions kvdb-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,17 @@ pub struct DatabaseConfig {
/// It can have a negative performance impact up to 10% according to
/// https://github.com/facebook/rocksdb/wiki/Statistics.
pub enable_statistics: bool,
/// Open the database as a secondary instance.
/// Specify a path for the secondary instance of the database.
/// Secondary instances are read-only and kept updated by tailing the rocksdb MANIFEST.
/// It is up to the user to call `catch_up_with_primary()` manually to update the secondary db.
/// Disabled by default.
///
/// `max_open_files` is overridden to always equal `-1`.
/// May have a negative performance impact on the secondary instance
/// if the secondary instance reads and applies state changes before the primary instance compacts them.
/// More info: https://github.com/facebook/rocksdb/wiki/Secondary-instance
pub secondary: Option<String>,
}

impl DatabaseConfig {
Expand Down Expand Up @@ -215,6 +226,7 @@ impl Default for DatabaseConfig {
columns: 1,
keep_log_file_num: 1,
enable_statistics: false,
secondary: None,
}
}
}
Expand Down Expand Up @@ -305,7 +317,11 @@ fn generate_options(config: &DatabaseConfig) -> Options {
}
opts.set_use_fsync(false);
opts.create_if_missing(true);
opts.set_max_open_files(config.max_open_files);
if config.secondary.is_some() {
opts.set_max_open_files(-1)
} else {
opts.set_max_open_files(config.max_open_files);
}
opts.set_bytes_per_sync(1 * MB as u64);
opts.set_keep_log_file_num(1);
opts.increase_parallelism(cmp::max(1, num_cpus::get() as i32 / 2));
Expand Down Expand Up @@ -364,12 +380,38 @@ impl Database {
}

let column_names: Vec<_> = (0..config.columns).map(|c| format!("col{}", c)).collect();

let write_opts = WriteOptions::default();
let read_opts = generate_read_options();

let db = if let Some(secondary_path) = &config.secondary {
Self::open_secondary(&opts, path, secondary_path.as_str(), column_names.as_slice())?
} else {
let column_names: Vec<&str> = column_names.iter().map(|s| s.as_str()).collect();
Self::open_primary(&opts, path, config, column_names.as_slice(), &block_opts)?
};

Ok(Database {
db: RwLock::new(Some(DBAndColumns { db, column_names })),
config: config.clone(),
path: path.to_owned(),
opts,
read_opts,
write_opts,
block_opts,
stats: stats::RunningDbStats::new(),
})
}

/// Internal api to open a database in primary mode.
fn open_primary(
opts: &Options,
path: &str,
config: &DatabaseConfig,
column_names: &[&str],
block_opts: &BlockBasedOptions,
) -> io::Result<rocksdb::DB> {
let cf_descriptors: Vec<_> = (0..config.columns)
.map(|i| ColumnFamilyDescriptor::new(&column_names[i as usize], config.column_config(&block_opts, i)))
.map(|i| ColumnFamilyDescriptor::new(column_names[i as usize], config.column_config(&block_opts, i)))
.collect();

let db = match DB::open_cf_descriptors(&opts, path, cf_descriptors) {
Expand All @@ -390,31 +432,42 @@ impl Database {
ok => ok,
};

let db = match db {
Ok(match db {
Ok(db) => db,
Err(ref s) if is_corrupted(s) => {
warn!("DB corrupted: {}, attempting repair", s);
DB::repair(&opts, path).map_err(other_io_err)?;

let cf_descriptors: Vec<_> = (0..config.columns)
.map(|i| {
ColumnFamilyDescriptor::new(&column_names[i as usize], config.column_config(&block_opts, i))
ColumnFamilyDescriptor::new(column_names[i as usize], config.column_config(&block_opts, i))
})
.collect();

DB::open_cf_descriptors(&opts, path, cf_descriptors).map_err(other_io_err)?
}
Err(s) => return Err(other_io_err(s)),
};
Ok(Database {
db: RwLock::new(Some(DBAndColumns { db, column_names })),
config: config.clone(),
path: path.to_owned(),
opts,
read_opts,
write_opts,
block_opts,
stats: stats::RunningDbStats::new(),
})
}

/// Internal api to open a database in secondary mode.
/// Secondary database needs a seperate path to store its own logs.
fn open_secondary(
opts: &Options,
path: &str,
secondary_path: &str,
column_names: &[String],
) -> io::Result<rocksdb::DB> {
let db = DB::open_cf_as_secondary(&opts, path, secondary_path, column_names);

Ok(match db {
Ok(db) => db,
Err(ref s) if is_corrupted(s) => {
warn!("DB corrupted: {}, attempting repair", s);
DB::repair(&opts, path).map_err(other_io_err)?;
DB::open_cf_as_secondary(&opts, path, secondary_path, column_names).map_err(other_io_err)?
}
Err(s) => return Err(other_io_err(s)),
})
}

Expand Down Expand Up @@ -635,6 +688,33 @@ impl Database {
HashMap::new()
}
}

/// Try to catch up a secondary instance with
/// the primary by reading as much from the logs as possible.
///
/// Guaranteed to have changes up to the the time that `try_catch_up_with_primary` is called
/// if it finishes succesfully.
///
/// Blocks until the MANIFEST file and any state changes in the corresponding Write-Ahead-Logs
/// are applied to the secondary instance. If the manifest files are very large
/// this method could take a long time.
///
/// If Write-Ahead-Logs have been purged by the primary instance before the secondary
/// is able to open them, the secondary will not be caught up
/// until this function is called again and new Write-Ahead-Logs are identified.
///
/// If called while the primary is writing, the catch-up may fail.
///
/// If the secondary is unable to catch up because of missing logs,
/// this method fails silently and no error is returned.
///
/// Calling this as primary will return an error.
pub fn try_catch_up_with_primary(&self) -> io::Result<()> {
match self.db.read().as_ref() {
Some(DBAndColumns { db, .. }) => db.try_catch_up_with_primary().map_err(other_io_err),
None => Ok(()),
}
}
}

// duplicate declaration of methods here to avoid trait import in certain existing cases
Expand Down Expand Up @@ -755,6 +835,48 @@ mod tests {
st::test_io_stats(&db)
}

#[test]
fn secondary_db_get() -> io::Result<()> {
let primary = TempDir::new("")?;
let config = DatabaseConfig::with_columns(1);
let db = Database::open(&config, primary.path().to_str().expect("tempdir path is valid unicode"))?;

let key1 = b"key1";
let mut transaction = db.transaction();
transaction.put(0, key1, b"horse");
db.write(transaction)?;

let config = DatabaseConfig {
secondary: TempDir::new("")?.path().to_str().map(|s| s.to_string()),
..DatabaseConfig::with_columns(1)
};
let second_db = Database::open(&config, primary.path().to_str().expect("tempdir path is valid unicode"))?;
assert_eq!(&*second_db.get(0, key1)?.unwrap(), b"horse");
Ok(())
}

#[test]
fn secondary_db_catch_up() -> io::Result<()> {
let primary = TempDir::new("")?;
let config = DatabaseConfig::with_columns(1);
let db = Database::open(&config, primary.path().to_str().expect("tempdir path is valid unicode"))?;

let config = DatabaseConfig {
secondary: TempDir::new("")?.path().to_str().map(|s| s.to_string()),
..DatabaseConfig::with_columns(1)
};
let second_db = Database::open(&config, primary.path().to_str().expect("tempdir path is valid unicode"))?;

let mut transaction = db.transaction();
transaction.put(0, b"key1", b"mule");
transaction.put(0, b"key2", b"cat");
db.write(transaction)?;

second_db.try_catch_up_with_primary()?;
assert_eq!(&*second_db.get(0, b"key2")?.unwrap(), b"cat");
Ok(())
}

#[test]
fn mem_tables_size() {
let tempdir = TempDir::new("").unwrap();
Expand All @@ -766,6 +888,7 @@ mod tests {
columns: 11,
keep_log_file_num: 1,
enable_statistics: false,
secondary: None,
};

let db = Database::open(&config, tempdir.path().to_str().unwrap()).unwrap();
Expand Down

0 comments on commit 990d45d

Please sign in to comment.