Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I/O statistic for key-value databases #294

Merged
merged 17 commits into from
Jan 2, 2020
128 changes: 119 additions & 9 deletions kvdb-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

mod iter;
mod stats;

use std::{cmp, collections::HashMap, convert::identity, error, fs, io, mem, path::Path, result};

Expand Down Expand Up @@ -271,6 +272,8 @@ pub struct Database {
block_opts: BlockBasedOptions,
// Dirty values added with `write_buffered`. Cleaned on `flush`.
overlay: RwLock<Vec<HashMap<DBKey, KeyState>>>,
#[ignore_malloc_size_of = "insignificant"]
stats: stats::RunningDbStats,
// Values currently being flushed. Cleared when `flush` completes.
flushing: RwLock<Vec<HashMap<DBKey, KeyState>>>,
// Prevents concurrent flushes.
Expand Down Expand Up @@ -403,6 +406,7 @@ impl Database {
read_opts,
write_opts,
block_opts,
stats: stats::RunningDbStats::new(),
})
}

Expand All @@ -428,20 +432,32 @@ impl Database {
match *self.db.read() {
Some(ref cfs) => {
let mut batch = WriteBatch::default();
let mut ops: usize = 0;
let mut bytes: usize = 0;
mem::swap(&mut *self.overlay.write(), &mut *self.flushing.write());
{
for (c, column) in self.flushing.read().iter().enumerate() {
ops += column.len();
for (key, state) in column.iter() {
let cf = cfs.cf(c);
match *state {
KeyState::Delete => batch.delete_cf(cf, key).map_err(other_io_err)?,
KeyState::Insert(ref value) => batch.put_cf(cf, key, value).map_err(other_io_err)?,
KeyState::Delete => {
bytes += key.len();
batch.delete_cf(cf, key).map_err(other_io_err)?
}
KeyState::Insert(ref value) => {
bytes += key.len() + value.len();
batch.put_cf(cf, key, value).map_err(other_io_err)?
}
};
}
}
}

check_for_corruption(&self.path, cfs.db.write_opt(batch, &self.write_opts))?;
self.stats.tally_transactions(1);
self.stats.tally_writes(ops as u64);
self.stats.tally_bytes_written(bytes as u64);

for column in self.flushing.write().iter_mut() {
column.clear();
Expand Down Expand Up @@ -474,17 +490,31 @@ impl Database {
Some(ref cfs) => {
let mut batch = WriteBatch::default();
let ops = tr.ops;

self.stats.tally_writes(ops.len() as u64);
self.stats.tally_transactions(1);

let mut stats_total_bytes = 0;

for op in ops {
// remove any buffered operation for this key
self.overlay.write()[op.col() as usize].remove(op.key());

let cf = cfs.cf(op.col() as usize);

match op {
DBOp::Insert { col: _, key, value } => batch.put_cf(cf, &key, &value).map_err(other_io_err)?,
DBOp::Delete { col: _, key } => batch.delete_cf(cf, &key).map_err(other_io_err)?,
DBOp::Insert { col: _, key, value } => {
stats_total_bytes += key.len() + value.len();
batch.put_cf(cf, &key, &value).map_err(other_io_err)?
}
DBOp::Delete { col: _, key } => {
// We count deletes as writes.
NikVolf marked this conversation as resolved.
Show resolved Hide resolved
stats_total_bytes += key.len();
batch.delete_cf(cf, &key).map_err(other_io_err)?
}
};
}
self.stats.tally_bytes_written(stats_total_bytes as u64);

check_for_corruption(&self.path, cfs.db.write_opt(batch, &self.write_opts))
}
Expand All @@ -496,6 +526,7 @@ impl Database {
pub fn get(&self, col: u32, key: &[u8]) -> io::Result<Option<DBValue>> {
match *self.db.read() {
Some(ref cfs) => {
self.stats.tally_reads(1);
let overlay = &self.overlay.read()[col as usize];
match overlay.get(key) {
Some(&KeyState::Insert(ref value)) => Ok(Some(value.clone())),
Expand All @@ -505,11 +536,21 @@ impl Database {
match flushing.get(key) {
Some(&KeyState::Insert(ref value)) => Ok(Some(value.clone())),
Some(&KeyState::Delete) => Ok(None),
None => cfs
.db
.get_pinned_cf_opt(cfs.cf(col as usize), key, &self.read_opts)
.map(|r| r.map(|v| v.to_vec()))
.map_err(other_io_err),
None => {
let aquired_val = cfs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo here: "acquire"

.db
.get_pinned_cf_opt(cfs.cf(col as usize), key, &self.read_opts)
.map(|r| r.map(|v| v.to_vec()))
.map_err(other_io_err);

match aquired_val {
Ok(Some(ref v)) => self.stats.tally_bytes_read((key.len() + v.len()) as u64),
Ok(None) => self.stats.tally_bytes_read(key.len() as u64),
_ => {}
};

aquired_val
}
}
}
}
Expand Down Expand Up @@ -704,6 +745,26 @@ impl KeyValueDB for Database {
fn restore(&self, new_db: &str) -> io::Result<()> {
Database::restore(self, new_db)
}

fn io_stats(&self, kind: kvdb::IoStatsKind) -> kvdb::IoStats {
let taken_stats = match kind {
kvdb::IoStatsKind::Overall => self.stats.overall(),
kvdb::IoStatsKind::SincePrevious => self.stats.since_previous(),
};

let mut stats = kvdb::IoStats::empty();

stats.reads = taken_stats.raw.reads;
stats.writes = taken_stats.raw.writes;
stats.transactions = taken_stats.raw.transactions;
stats.bytes_written = taken_stats.raw.bytes_written;
stats.bytes_read = taken_stats.raw.bytes_read;

stats.started = taken_stats.started;
stats.span = taken_stats.started.elapsed();

stats
}
}

impl Drop for Database {
Expand Down Expand Up @@ -916,6 +977,55 @@ mod tests {
assert_eq!(db.num_keys(0).unwrap(), 1, "adding a key increases the count");
}

#[test]
fn stats() {
use kvdb::IoStatsKind;

let tempdir = TempDir::new("").unwrap();
let config = DatabaseConfig::with_columns(3);
let db = Database::open(&config, tempdir.path().to_str().unwrap()).unwrap();

let key1 = b"kkk";
let mut batch = db.transaction();
batch.put(0, key1, key1);
batch.put(1, key1, key1);
batch.put(2, key1, key1);

for _ in 0..10 {
db.get(0, key1).unwrap();
}

db.write(batch).unwrap();

let io_stats = db.io_stats(IoStatsKind::SincePrevious);
assert_eq!(io_stats.transactions, 1);
assert_eq!(io_stats.writes, 3);
assert_eq!(io_stats.bytes_written, 18);
assert_eq!(io_stats.reads, 10);
assert_eq!(io_stats.bytes_read, 30);

let new_io_stats = db.io_stats(IoStatsKind::SincePrevious);
// Since we taken previous statistic period,
// this is expected to be totally empty.
assert_eq!(new_io_stats.transactions, 0);

// but the overall should be there
let new_io_stats = db.io_stats(IoStatsKind::Overall);
assert_eq!(new_io_stats.bytes_written, 18);

let mut batch = db.transaction();
batch.delete(0, key1);
batch.delete(1, key1);
batch.delete(2, key1);

// transaction is not commited yet
assert_eq!(db.io_stats(IoStatsKind::SincePrevious).writes, 0);

db.write(batch).unwrap();
// now it is, and delete is counted as write
assert_eq!(db.io_stats(IoStatsKind::SincePrevious).writes, 3);
}

#[test]
fn test_iter_by_prefix() {
let tempdir = TempDir::new("").unwrap();
Expand Down
144 changes: 144 additions & 0 deletions kvdb-rocksdb/src/stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright 2015-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity.

// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use parking_lot::RwLock;
use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
use std::time::Instant;

pub struct RawDbStats {
pub reads: u64,
pub writes: u64,
pub bytes_written: u64,
pub bytes_read: u64,
pub transactions: u64,
}

impl RawDbStats {
fn combine(&self, other: &RawDbStats) -> Self {
RawDbStats {
reads: self.reads + other.reads,
writes: self.writes + other.writes,
bytes_written: self.bytes_written + other.bytes_written,
bytes_read: self.bytes_read + other.bytes_written,
transactions: self.transactions + other.transactions,
}
}
}

struct OverallDbStats {
stats: RawDbStats,
last_taken: Instant,
started: Instant,
}

impl OverallDbStats {
fn new() -> Self {
OverallDbStats {
stats: RawDbStats { reads: 0, writes: 0, bytes_written: 0, bytes_read: 0, transactions: 0 },
last_taken: Instant::now(),
started: Instant::now(),
}
}
}

pub struct RunningDbStats {
reads: AtomicU64,
writes: AtomicU64,
bytes_written: AtomicU64,
bytes_read: AtomicU64,
transactions: AtomicU64,
overall: RwLock<OverallDbStats>,
}

pub struct TakenDbStats {
pub raw: RawDbStats,
pub started: Instant,
}

impl RunningDbStats {
pub fn new() -> Self {
Self {
reads: 0.into(),
bytes_read: 0.into(),
writes: 0.into(),
bytes_written: 0.into(),
transactions: 0.into(),
overall: OverallDbStats::new().into(),
}
}

pub fn tally_reads(&self, val: u64) {
self.reads.fetch_add(val, AtomicOrdering::Relaxed);
}

pub fn tally_bytes_read(&self, val: u64) {
self.bytes_read.fetch_add(val, AtomicOrdering::Relaxed);
}

pub fn tally_writes(&self, val: u64) {
self.writes.fetch_add(val, AtomicOrdering::Relaxed);
}

pub fn tally_bytes_written(&self, val: u64) {
self.bytes_written.fetch_add(val, AtomicOrdering::Relaxed);
}

pub fn tally_transactions(&self, val: u64) {
self.transactions.fetch_add(val, AtomicOrdering::Relaxed);
}

fn take_current(&self) -> RawDbStats {
RawDbStats {
reads: self.reads.swap(0, AtomicOrdering::Relaxed),
writes: self.writes.swap(0, AtomicOrdering::Relaxed),
bytes_written: self.bytes_written.swap(0, AtomicOrdering::Relaxed),
bytes_read: self.bytes_read.swap(0, AtomicOrdering::Relaxed),
transactions: self.transactions.swap(0, AtomicOrdering::Relaxed),
}
}

fn peek_current(&self) -> RawDbStats {
RawDbStats {
reads: self.reads.load(AtomicOrdering::Relaxed),
writes: self.writes.load(AtomicOrdering::Relaxed),
bytes_written: self.bytes_written.load(AtomicOrdering::Relaxed),
bytes_read: self.bytes_read.load(AtomicOrdering::Relaxed),
transactions: self.transactions.load(AtomicOrdering::Relaxed),
}
}

pub fn since_previous(&self) -> TakenDbStats {
let mut overall_lock = self.overall.write();

let current = self.take_current();

overall_lock.stats = overall_lock.stats.combine(&current);

let stats = TakenDbStats { raw: current, started: overall_lock.last_taken };

overall_lock.last_taken = Instant::now();

stats
}

pub fn overall(&self) -> TakenDbStats {
let overall_lock = self.overall.read();

let current = self.peek_current();

TakenDbStats { raw: overall_lock.stats.combine(&current), started: overall_lock.started }
}
}
Loading