Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Archive Mode and groundwork for state-preserving handles #166

Merged
merged 2 commits into from
May 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion polkadot/consensus/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ fn start_bft<F, C>(
}
};


let input = Messages {
network_stream: network.bft_messages(parent_hash),
local_id: bft_service.local_id(),
Expand Down
153 changes: 127 additions & 26 deletions substrate/client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,6 @@ struct PendingBlock {
is_best: bool,
}

/// Database transaction
pub struct BlockImportOperation {
pending_state: DbState,
pending_block: Option<PendingBlock>,
}

#[derive(Clone)]
struct Meta {
best_hash: HeaderHash,
Expand Down Expand Up @@ -261,11 +255,18 @@ impl client::blockchain::Backend for BlockchainDb {
}
}

/// Database transaction
pub struct BlockImportOperation {
old_state: DbState,
updates: MemoryDB,
pending_block: Option<PendingBlock>,
}

impl client::backend::BlockImportOperation for BlockImportOperation {
type State = DbState;

fn state(&self) -> Result<&Self::State, client::error::Error> {
Ok(&self.pending_state)
Ok(&self.old_state)
}

fn set_block_data(&mut self, header: block::Header, body: Option<block::Body>, justification: Option<primitives::bft::Justification>, is_best: bool) -> Result<(), client::error::Error> {
Expand All @@ -280,14 +281,14 @@ impl client::backend::BlockImportOperation for BlockImportOperation {
}

fn update_storage(&mut self, update: MemoryDB) -> Result<(), client::error::Error> {
self.pending_state.commit(update);
self.updates = update;
Ok(())
}

fn reset_storage<I: Iterator<Item=(Vec<u8>, Vec<u8>)>>(&mut self, iter: I) -> Result<(), client::error::Error> {
// TODO: wipe out existing trie.
let (_, update) = self.pending_state.storage_root(iter.into_iter().map(|(k, v)| (k, Some(v))));
self.pending_state.commit(update);
let (_, update) = self.old_state.storage_root(iter.into_iter().map(|(k, v)| (k, Some(v))));
self.updates = update;
Ok(())
}
}
Expand Down Expand Up @@ -341,10 +342,10 @@ impl<'a> HashDB for Ephemeral<'a> {
}

/// DB-backed patricia trie state, transaction type is an overlay of changes to commit.
#[derive(Clone)]
pub struct DbState {
db: Arc<KeyValueDB>,
root: TrieH256,
updates: MemoryDB,
}

impl state_machine::Backend for DbState {
Expand All @@ -364,10 +365,6 @@ impl state_machine::Backend for DbState {
.get(key).map(|x| x.map(|val| val.to_vec())).map_err(map_e)
}

fn commit(&mut self, transaction: MemoryDB) {
self.updates = transaction;
}

fn pairs(&self) -> Vec<(Vec<u8>, Vec<u8>)> {
let mut read_overlay = MemoryDB::default();
let eph = Ephemeral {
Expand Down Expand Up @@ -423,10 +420,12 @@ impl state_machine::Backend for DbState {
}
}

/// In-memory backend. Keeps all states and blocks in memory. Useful for testing.
/// Disk backend. Keeps data in a key-value store. In archive mode, trie nodes are kept from all blocks.
/// Otherwise, trie nodes are kept only from the most recent block.
pub struct Backend {
db: Arc<KeyValueDB>,
blockchain: BlockchainDb,
archive: bool,
}

impl Backend {
Expand All @@ -438,22 +437,23 @@ impl Backend {
let path = config.path.to_str().ok_or_else(|| client::error::ErrorKind::Backend("Invalid database path".into()))?;
let db = Arc::new(Database::open(&db_config, &path).map_err(db_err)?);

Backend::from_kvdb(db as Arc<_>)
Backend::from_kvdb(db as Arc<_>, true)
}

#[cfg(test)]
fn new_test() -> Backend {
let db = Arc::new(::kvdb_memorydb::create(columns::NUM_COLUMNS));

Backend::from_kvdb(db as Arc<_>).expect("failed to create test-db")
Backend::from_kvdb(db as Arc<_>, false).expect("failed to create test-db")
}

fn from_kvdb(db: Arc<KeyValueDB>) -> Result<Backend, client::error::Error> {
fn from_kvdb(db: Arc<KeyValueDB>, archive: bool) -> Result<Backend, client::error::Error> {
let blockchain = BlockchainDb::new(db.clone())?;

Ok(Backend {
db,
blockchain,
archive
})
}
}
Expand All @@ -467,7 +467,8 @@ impl client::backend::Backend for Backend {
let state = self.state_at(block)?;
Ok(BlockImportOperation {
pending_block: None,
pending_state: state,
old_state: state,
updates: MemoryDB::default(),
})
}

Expand All @@ -488,10 +489,10 @@ impl client::backend::Backend for Backend {
if pending_block.is_best {
transaction.put(columns::META, meta::BEST_BLOCK, &key);
}
for (key, (val, rc)) in operation.pending_state.updates.drain() {
for (key, (val, rc)) in operation.updates.drain() {
if rc > 0 {
transaction.put(columns::STATE, &key.0[..], &val);
} else {
} else if rc < 0 && !self.archive {
transaction.delete(columns::STATE, &key.0[..]);
}
}
Expand All @@ -518,7 +519,6 @@ impl client::backend::Backend for Backend {

return Ok(DbState {
db: self.db.clone(),
updates: Default::default(),
root,
})
}
Expand All @@ -528,7 +528,6 @@ impl client::backend::Backend for Backend {
self.blockchain.header(block).and_then(|maybe_hdr| maybe_hdr.map(|hdr| {
DbState {
db: self.db.clone(),
updates: Default::default(),
root: hdr.state_root.0.into(),
}
}).ok_or_else(|| client::error::ErrorKind::UnknownBlock(block).into()))
Expand Down Expand Up @@ -595,7 +594,7 @@ mod tests {
(vec![1, 2, 3], vec![9, 9, 9]),
];

header.state_root = op.pending_state.storage_root(storage
header.state_root = op.old_state.storage_root(storage
.iter()
.cloned()
.map(|(x, y)| (x, Some(y)))
Expand Down Expand Up @@ -634,7 +633,7 @@ mod tests {
(vec![5, 5, 5], Some(vec![4, 5, 6])),
];

let (root, overlay) = op.pending_state.storage_root(storage.iter().cloned());
let (root, overlay) = op.old_state.storage_root(storage.iter().cloned());
op.update_storage(overlay).unwrap();
header.state_root = root.into();

Expand All @@ -654,4 +653,106 @@ mod tests {
assert_eq!(state.storage(&[5, 5, 5]).unwrap(), Some(vec![4, 5, 6]));
}
}

#[test]
fn delete_only_when_negative_rc() {
let key;
let db = Backend::new_test();

{
let mut op = db.begin_operation(BlockId::Hash(Default::default())).unwrap();
let mut header = block::Header {
number: 0,
parent_hash: Default::default(),
state_root: Default::default(),
digest: Default::default(),
extrinsics_root: Default::default(),
};

let storage: Vec<(_, _)> = vec![];

header.state_root = op.old_state.storage_root(storage
.iter()
.cloned()
.map(|(x, y)| (x, Some(y)))
).0.into();

op.reset_storage(storage.iter().cloned()).unwrap();

key = op.updates.insert(b"hello");
op.set_block_data(
header,
Some(vec![]),
None,
true
).unwrap();

db.commit_operation(op).unwrap();

assert_eq!(db.db.get(::columns::STATE, &key.0[..]).unwrap().unwrap(), &b"hello"[..]);
}

{
let mut op = db.begin_operation(BlockId::Number(0)).unwrap();
let mut header = block::Header {
number: 1,
parent_hash: Default::default(),
state_root: Default::default(),
digest: Default::default(),
extrinsics_root: Default::default(),
};

let storage: Vec<(_, _)> = vec![];

header.state_root = op.old_state.storage_root(storage
.iter()
.cloned()
.map(|(x, y)| (x, Some(y)))
).0.into();

op.updates.insert(b"hello");
op.updates.remove(&key);
op.set_block_data(
header,
Some(vec![]),
None,
true
).unwrap();

db.commit_operation(op).unwrap();

assert_eq!(db.db.get(::columns::STATE, &key.0[..]).unwrap().unwrap(), &b"hello"[..]);
}

{
let mut op = db.begin_operation(BlockId::Number(1)).unwrap();
let mut header = block::Header {
number: 1,
parent_hash: Default::default(),
state_root: Default::default(),
digest: Default::default(),
extrinsics_root: Default::default(),
};

let storage: Vec<(_, _)> = vec![];

header.state_root = op.old_state.storage_root(storage
.iter()
.cloned()
.map(|(x, y)| (x, Some(y)))
).0.into();

op.updates.remove(&key);
op.set_block_data(
header,
Some(vec![]),
None,
true
).unwrap();

db.commit_operation(op).unwrap();

assert!(db.db.get(::columns::STATE, &key.0[..]).unwrap().is_none());
}
}
}
9 changes: 8 additions & 1 deletion substrate/client/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ pub trait BlockImportOperation {
}

/// Client backend. Manages the data layer.
///
/// Note on state pruning: while an object from `state_at` is alive, the state
/// should not be pruned. The backend should internally reference-count
/// its state objects.
///
/// The same applies for live `BlockImportOperation`s: while an import operation building on a parent `P`
/// is alive, the state for `P` should not be pruned.
pub trait Backend {
/// Associated block insertion operation type.
type BlockImportOperation: BlockImportOperation;
Expand All @@ -52,6 +59,6 @@ pub trait Backend {
fn commit_operation(&self, transaction: Self::BlockImportOperation) -> error::Result<()>;
/// Returns reference to blockchain backend.
fn blockchain(&self) -> &Self::Blockchain;
/// Returns state backend for specified block.
/// Returns state backend with post-state of given block.
fn state_at(&self, block: BlockId) -> error::Result<Self::State>;
}
Loading