Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Fixes and traces for refcountdb. #705

Merged
merged 4 commits into from
Mar 13, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions parity/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,10 +398,10 @@ impl Configuration {
}
}
client_config.pruning = match self.args.flag_pruning.as_str() {
"" | "archive" => journaldb::Algorithm::Archive,
"pruned" => journaldb::Algorithm::EarlyMerge,
"archive" => journaldb::Algorithm::Archive,
"light" => journaldb::Algorithm::EarlyMerge,
"fast" => journaldb::Algorithm::OverlayRecent,
"slow" => journaldb::Algorithm::RefCounted,
"basic" => journaldb::Algorithm::RefCounted,
_ => { die!("Invalid pruning method given."); }
};
client_config.name = self.args.flag_identity.clone();
Expand Down
3 changes: 2 additions & 1 deletion util/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ use network::NetworkError;
use rlp::DecoderError;
use io;
use std::fmt;
use hash::H256;

#[derive(Debug)]
/// Error in database subsystem.
pub enum BaseDataError {
/// An entry was removed more times than inserted.
NegativelyReferencedHash,
NegativelyReferencedHash(H256),
}

#[derive(Debug)]
Expand Down
8 changes: 6 additions & 2 deletions util/src/journaldb/archivedb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl JournalDB for ArchiveDB {
Box::new(ArchiveDB {
overlay: MemoryDB::new(),
backing: self.backing.clone(),
latest_era: None,
latest_era: self.latest_era,
})
}

Expand All @@ -144,7 +144,7 @@ impl JournalDB for ArchiveDB {
self.latest_era.is_none()
}

fn commit(&mut self, _: u64, _: &H256, _: Option<(u64, H256)>) -> Result<u32, UtilError> {
fn commit(&mut self, now: u64, _: &H256, _: Option<(u64, H256)>) -> Result<u32, UtilError> {
let batch = DBTransaction::new();
let mut inserts = 0usize;
let mut deletes = 0usize;
Expand All @@ -160,6 +160,10 @@ impl JournalDB for ArchiveDB {
deletes += 1;
}
}
if self.latest_era.map_or(true, |e| now > e) {
try!(batch.put(&LATEST_ERA_KEY, &encode(&now)));
self.latest_era = Some(now);
}
try!(self.backing.write(batch));
Ok((inserts + deletes) as u32)
}
Expand Down
34 changes: 22 additions & 12 deletions util/src/journaldb/refcounteddb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub struct RefCountedDB {
const LATEST_ERA_KEY : [u8; 12] = [ b'l', b'a', b's', b't', 0, 0, 0, 0, 0, 0, 0, 0 ];
const VERSION_KEY : [u8; 12] = [ b'j', b'v', b'e', b'r', 0, 0, 0, 0, 0, 0, 0, 0 ];
const DB_VERSION : u32 = 512;
const PADDING : [u8; 10] = [ 0u8; 10 ];

impl RefCountedDB {
/// Create a new instance given a `backing` database.
Expand Down Expand Up @@ -131,9 +132,10 @@ impl JournalDB for RefCountedDB {
let mut last;

while try!(self.backing.get({
let mut r = RlpStream::new_list(2);
let mut r = RlpStream::new_list(3);
r.append(&now);
r.append(&index);
r.append(&&PADDING[..]);
last = r.drain();
&last
})).is_some() {
Expand All @@ -144,7 +146,10 @@ impl JournalDB for RefCountedDB {
r.append(id);
r.append(&self.inserts);
r.append(&self.removes);
try!(self.backing.put(&last, r.as_raw()));
try!(batch.put(&last, r.as_raw()));

trace!(target: "rcdb", "new journal for time #{}.{} => {}: inserts={:?}, removes={:?}", now, index, id, self.inserts, self.removes);

self.inserts.clear();
self.removes.clear();

Expand All @@ -158,20 +163,25 @@ impl JournalDB for RefCountedDB {
if let Some((end_era, canon_id)) = end {
let mut index = 0usize;
let mut last;
while let Some(rlp_data) = try!(self.backing.get({
let mut r = RlpStream::new_list(2);
r.append(&end_era);
r.append(&index);
last = r.drain();
&last
})) {
while let Some(rlp_data) = {
// trace!(target: "rcdb", "checking for journal #{}.{}", end_era, index);
try!(self.backing.get({
let mut r = RlpStream::new_list(3);
r.append(&end_era);
r.append(&index);
r.append(&&PADDING[..]);
last = r.drain();
&last
}))
} {
let rlp = Rlp::new(&rlp_data);
let to_remove: Vec<H256> = rlp.val_at(if canon_id == rlp.val_at(0) {2} else {1});
let our_id: H256 = rlp.val_at(0);
let to_remove: Vec<H256> = rlp.val_at(if canon_id == our_id {2} else {1});
trace!(target: "rcdb", "delete journal for time #{}.{}=>{}, (canon was {}): deleting {:?}", end_era, index, our_id, canon_id, to_remove);
for i in &to_remove {
self.forward.remove(i);
}
try!(self.backing.delete(&last));
trace!("RefCountedDB: delete journal for time #{}.{}, (canon was {}): {} entries", end_era, index, canon_id, to_remove.len());
try!(batch.delete(&last));
index += 1;
}
}
Expand Down
57 changes: 48 additions & 9 deletions util/src/overlaydb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,15 @@ impl OverlayDB {
let (back_value, back_rc) = x;
let total_rc: i32 = back_rc as i32 + rc;
if total_rc < 0 {
return Err(From::from(BaseDataError::NegativelyReferencedHash));
return Err(From::from(BaseDataError::NegativelyReferencedHash(key)));
}
deletes += if self.put_payload(batch, &key, (back_value, total_rc as u32)) {1} else {0};
deletes += if self.put_payload_in_batch(batch, &key, (back_value, total_rc as u32)) {1} else {0};
}
None => {
if rc < 0 {
return Err(From::from(BaseDataError::NegativelyReferencedHash));
return Err(From::from(BaseDataError::NegativelyReferencedHash(key)));
}
self.put_payload(batch, &key, (value, rc as u32));
self.put_payload_in_batch(batch, &key, (value, rc as u32));
}
};
ret += 1;
Expand Down Expand Up @@ -116,10 +116,32 @@ impl OverlayDB {
/// }
/// ```
pub fn commit(&mut self) -> Result<u32, UtilError> {
let batch = DBTransaction::new();
let r = try!(self.commit_to_batch(&batch));
try!(self.backing.write(batch));
Ok(r)
let mut ret = 0u32;
let mut deletes = 0usize;
for i in self.overlay.drain().into_iter() {
let (key, (value, rc)) = i;
if rc != 0 {
match self.payload(&key) {
Some(x) => {
let (back_value, back_rc) = x;
let total_rc: i32 = back_rc as i32 + rc;
if total_rc < 0 {
return Err(From::from(BaseDataError::NegativelyReferencedHash(key)));
}
deletes += if self.put_payload(&key, (back_value, total_rc as u32)) {1} else {0};
}
None => {
if rc < 0 {
return Err(From::from(BaseDataError::NegativelyReferencedHash(key)));
}
self.put_payload(&key, (value, rc as u32));
}
};
ret += 1;
}
}
trace!("OverlayDB::commit() deleted {} nodes", deletes);
Ok(ret)
}

/// Revert all operations on this object (i.e. `insert()`s and `kill()`s) since the
Expand All @@ -145,6 +167,9 @@ impl OverlayDB {
/// ```
pub fn revert(&mut self) { self.overlay.clear(); }

/// Get the number of references that would be committed.
pub fn commit_refs(&self, key: &H256) -> i32 { self.overlay.raw(&key).map_or(0, |&(_, refs)| refs) }

/// Get the refs and value of the given key.
fn payload(&self, key: &H256) -> Option<(Bytes, u32)> {
self.backing.get(&key.bytes())
Expand All @@ -156,7 +181,7 @@ impl OverlayDB {
}

/// Put the refs and value of the given key, possibly deleting it from the db.
fn put_payload(&self, batch: &DBTransaction, key: &H256, payload: (Bytes, u32)) -> bool {
fn put_payload_in_batch(&self, batch: &DBTransaction, key: &H256, payload: (Bytes, u32)) -> bool {
if payload.1 > 0 {
let mut s = RlpStream::new_list(2);
s.append(&payload.1);
Expand All @@ -168,6 +193,20 @@ impl OverlayDB {
true
}
}

/// Put the refs and value of the given key, possibly deleting it from the db.
fn put_payload(&self, key: &H256, payload: (Bytes, u32)) -> bool {
if payload.1 > 0 {
let mut s = RlpStream::new_list(2);
s.append(&payload.1);
s.append(&payload.0);
self.backing.put(&key.bytes(), s.as_raw()).expect("Low-level database error. Some issue with your hard disk?");
false
} else {
self.backing.delete(&key.bytes()).expect("Low-level database error. Some issue with your hard disk?");
true
}
}
}

impl HashDB for OverlayDB {
Expand Down