Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Archive Mode and groundwork for state-preserving handles (#166)
Browse files Browse the repository at this point in the history
* less eager deletion of DB values

* archive mode
  • Loading branch information
rphmeier authored and gavofyork committed May 17, 2018
1 parent 126498a commit 654e652
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 65 deletions.
1 change: 0 additions & 1 deletion polkadot/consensus/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ fn start_bft<F, C>(
}
};


let input = Messages {
network_stream: network.bft_messages(parent_hash),
local_id: bft_service.local_id(),
Expand Down
153 changes: 127 additions & 26 deletions substrate/client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,6 @@ struct PendingBlock {
is_best: bool,
}

/// Database transaction
pub struct BlockImportOperation {
pending_state: DbState,
pending_block: Option<PendingBlock>,
}

#[derive(Clone)]
struct Meta {
best_hash: HeaderHash,
Expand Down Expand Up @@ -261,11 +255,18 @@ impl client::blockchain::Backend for BlockchainDb {
}
}

/// Database transaction
pub struct BlockImportOperation {
old_state: DbState,
updates: MemoryDB,
pending_block: Option<PendingBlock>,
}

impl client::backend::BlockImportOperation for BlockImportOperation {
type State = DbState;

fn state(&self) -> Result<&Self::State, client::error::Error> {
Ok(&self.pending_state)
Ok(&self.old_state)
}

fn set_block_data(&mut self, header: block::Header, body: Option<block::Body>, justification: Option<primitives::bft::Justification>, is_best: bool) -> Result<(), client::error::Error> {
Expand All @@ -280,14 +281,14 @@ impl client::backend::BlockImportOperation for BlockImportOperation {
}

fn update_storage(&mut self, update: MemoryDB) -> Result<(), client::error::Error> {
self.pending_state.commit(update);
self.updates = update;
Ok(())
}

fn reset_storage<I: Iterator<Item=(Vec<u8>, Vec<u8>)>>(&mut self, iter: I) -> Result<(), client::error::Error> {
// TODO: wipe out existing trie.
let (_, update) = self.pending_state.storage_root(iter.into_iter().map(|(k, v)| (k, Some(v))));
self.pending_state.commit(update);
let (_, update) = self.old_state.storage_root(iter.into_iter().map(|(k, v)| (k, Some(v))));
self.updates = update;
Ok(())
}
}
Expand Down Expand Up @@ -341,10 +342,10 @@ impl<'a> HashDB for Ephemeral<'a> {
}

/// DB-backed patricia trie state, transaction type is an overlay of changes to commit.
#[derive(Clone)]
pub struct DbState {
db: Arc<KeyValueDB>,
root: TrieH256,
updates: MemoryDB,
}

impl state_machine::Backend for DbState {
Expand All @@ -364,10 +365,6 @@ impl state_machine::Backend for DbState {
.get(key).map(|x| x.map(|val| val.to_vec())).map_err(map_e)
}

fn commit(&mut self, transaction: MemoryDB) {
self.updates = transaction;
}

fn pairs(&self) -> Vec<(Vec<u8>, Vec<u8>)> {
let mut read_overlay = MemoryDB::default();
let eph = Ephemeral {
Expand Down Expand Up @@ -423,10 +420,12 @@ impl state_machine::Backend for DbState {
}
}

/// In-memory backend. Keeps all states and blocks in memory. Useful for testing.
/// Disk backend. Keeps data in a key-value store. In archive mode, trie nodes are kept from all blocks.
/// Otherwise, trie nodes are kept only from the most recent block.
pub struct Backend {
db: Arc<KeyValueDB>,
blockchain: BlockchainDb,
archive: bool,
}

impl Backend {
Expand All @@ -438,22 +437,23 @@ impl Backend {
let path = config.path.to_str().ok_or_else(|| client::error::ErrorKind::Backend("Invalid database path".into()))?;
let db = Arc::new(Database::open(&db_config, &path).map_err(db_err)?);

Backend::from_kvdb(db as Arc<_>)
Backend::from_kvdb(db as Arc<_>, true)
}

#[cfg(test)]
fn new_test() -> Backend {
let db = Arc::new(::kvdb_memorydb::create(columns::NUM_COLUMNS));

Backend::from_kvdb(db as Arc<_>).expect("failed to create test-db")
Backend::from_kvdb(db as Arc<_>, false).expect("failed to create test-db")
}

fn from_kvdb(db: Arc<KeyValueDB>) -> Result<Backend, client::error::Error> {
fn from_kvdb(db: Arc<KeyValueDB>, archive: bool) -> Result<Backend, client::error::Error> {
let blockchain = BlockchainDb::new(db.clone())?;

Ok(Backend {
db,
blockchain,
archive
})
}
}
Expand All @@ -467,7 +467,8 @@ impl client::backend::Backend for Backend {
let state = self.state_at(block)?;
Ok(BlockImportOperation {
pending_block: None,
pending_state: state,
old_state: state,
updates: MemoryDB::default(),
})
}

Expand All @@ -488,10 +489,10 @@ impl client::backend::Backend for Backend {
if pending_block.is_best {
transaction.put(columns::META, meta::BEST_BLOCK, &key);
}
for (key, (val, rc)) in operation.pending_state.updates.drain() {
for (key, (val, rc)) in operation.updates.drain() {
if rc > 0 {
transaction.put(columns::STATE, &key.0[..], &val);
} else {
} else if rc < 0 && !self.archive {
transaction.delete(columns::STATE, &key.0[..]);
}
}
Expand All @@ -518,7 +519,6 @@ impl client::backend::Backend for Backend {

return Ok(DbState {
db: self.db.clone(),
updates: Default::default(),
root,
})
}
Expand All @@ -528,7 +528,6 @@ impl client::backend::Backend for Backend {
self.blockchain.header(block).and_then(|maybe_hdr| maybe_hdr.map(|hdr| {
DbState {
db: self.db.clone(),
updates: Default::default(),
root: hdr.state_root.0.into(),
}
}).ok_or_else(|| client::error::ErrorKind::UnknownBlock(block).into()))
Expand Down Expand Up @@ -595,7 +594,7 @@ mod tests {
(vec![1, 2, 3], vec![9, 9, 9]),
];

header.state_root = op.pending_state.storage_root(storage
header.state_root = op.old_state.storage_root(storage
.iter()
.cloned()
.map(|(x, y)| (x, Some(y)))
Expand Down Expand Up @@ -634,7 +633,7 @@ mod tests {
(vec![5, 5, 5], Some(vec![4, 5, 6])),
];

let (root, overlay) = op.pending_state.storage_root(storage.iter().cloned());
let (root, overlay) = op.old_state.storage_root(storage.iter().cloned());
op.update_storage(overlay).unwrap();
header.state_root = root.into();

Expand All @@ -654,4 +653,106 @@ mod tests {
assert_eq!(state.storage(&[5, 5, 5]).unwrap(), Some(vec![4, 5, 6]));
}
}

#[test]
fn delete_only_when_negative_rc() {
let key;
let db = Backend::new_test();

{
let mut op = db.begin_operation(BlockId::Hash(Default::default())).unwrap();
let mut header = block::Header {
number: 0,
parent_hash: Default::default(),
state_root: Default::default(),
digest: Default::default(),
extrinsics_root: Default::default(),
};

let storage: Vec<(_, _)> = vec![];

header.state_root = op.old_state.storage_root(storage
.iter()
.cloned()
.map(|(x, y)| (x, Some(y)))
).0.into();

op.reset_storage(storage.iter().cloned()).unwrap();

key = op.updates.insert(b"hello");
op.set_block_data(
header,
Some(vec![]),
None,
true
).unwrap();

db.commit_operation(op).unwrap();

assert_eq!(db.db.get(::columns::STATE, &key.0[..]).unwrap().unwrap(), &b"hello"[..]);
}

{
let mut op = db.begin_operation(BlockId::Number(0)).unwrap();
let mut header = block::Header {
number: 1,
parent_hash: Default::default(),
state_root: Default::default(),
digest: Default::default(),
extrinsics_root: Default::default(),
};

let storage: Vec<(_, _)> = vec![];

header.state_root = op.old_state.storage_root(storage
.iter()
.cloned()
.map(|(x, y)| (x, Some(y)))
).0.into();

op.updates.insert(b"hello");
op.updates.remove(&key);
op.set_block_data(
header,
Some(vec![]),
None,
true
).unwrap();

db.commit_operation(op).unwrap();

assert_eq!(db.db.get(::columns::STATE, &key.0[..]).unwrap().unwrap(), &b"hello"[..]);
}

{
let mut op = db.begin_operation(BlockId::Number(1)).unwrap();
let mut header = block::Header {
number: 1,
parent_hash: Default::default(),
state_root: Default::default(),
digest: Default::default(),
extrinsics_root: Default::default(),
};

let storage: Vec<(_, _)> = vec![];

header.state_root = op.old_state.storage_root(storage
.iter()
.cloned()
.map(|(x, y)| (x, Some(y)))
).0.into();

op.updates.remove(&key);
op.set_block_data(
header,
Some(vec![]),
None,
true
).unwrap();

db.commit_operation(op).unwrap();

assert!(db.db.get(::columns::STATE, &key.0[..]).unwrap().is_none());
}
}
}
9 changes: 8 additions & 1 deletion substrate/client/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ pub trait BlockImportOperation {
}

/// Client backend. Manages the data layer.
///
/// Note on state pruning: while an object from `state_at` is alive, the state
/// should not be pruned. The backend should internally reference-count
/// its state objects.
///
/// The same applies for live `BlockImportOperation`s: while an import operation building on a parent `P`
/// is alive, the state for `P` should not be pruned.
pub trait Backend {
/// Associated block insertion operation type.
type BlockImportOperation: BlockImportOperation;
Expand All @@ -52,6 +59,6 @@ pub trait Backend {
fn commit_operation(&self, transaction: Self::BlockImportOperation) -> error::Result<()>;
/// Returns reference to blockchain backend.
fn blockchain(&self) -> &Self::Blockchain;
/// Returns state backend for specified block.
/// Returns state backend with post-state of given block.
fn state_at(&self, block: BlockId) -> error::Result<Self::State>;
}
Loading

0 comments on commit 654e652

Please sign in to comment.