Skip to content

Commit

Permalink
Merge pull request #357 from CosmWasm/349-transaction-cleanup
Browse files Browse the repository at this point in the history
Add transactional helper
  • Loading branch information
ethanfrey authored Jul 28, 2021
2 parents 6c091b2 + ddc3223 commit dae72a3
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 141 deletions.
63 changes: 26 additions & 37 deletions packages/multi-test/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use serde::Serialize;
use crate::bank::Bank;
use crate::contracts::Contract;
use crate::executor::{AppResponse, Executor};
use crate::transactions::StorageTransaction;
use crate::transactions::transactional;
use crate::wasm::{Wasm, WasmKeeper};

pub fn next_block(block: &mut BlockInfo) {
Expand Down Expand Up @@ -106,22 +106,12 @@ where
// meaning, wrap current state, all writes go to a cache, only when execute
// returns a success do we flush it (otherwise drop it)

let mut cache = StorageTransaction::new(self.storage.as_ref());

// run all messages, stops at first error
let res: Result<Vec<AppResponse>, String> = msgs
.into_iter()
.map(|msg| {
self.router
.execute(&mut cache, &self.block, sender.clone(), msg)
})
.collect();

// this only happens if all messages run successfully
if res.is_ok() {
cache.prepare().commit(self.storage.as_mut())
}
res
let (block, router) = (&self.block, &self.router);
transactional(self.storage.as_mut(), |write_cache, _| {
msgs.into_iter()
.map(|msg| router.execute(write_cache, block, sender.clone(), msg))
.collect()
})
}

/// This is an "admin" function to let us adjust bank accounts
Expand Down Expand Up @@ -277,6 +267,7 @@ mod test {
PayoutCountResponse, PayoutInitMessage, PayoutQueryMsg, PayoutSudoMsg, ReflectMessage,
ReflectQueryMsg,
};
use crate::transactions::StorageTransaction;
use crate::BankKeeper;

use super::*;
Expand Down Expand Up @@ -745,28 +736,26 @@ mod test {
assert_eq!(router_rcpt, vec![]);

// now, second level cache
let mut cache2 = cache.cache();
let msg = BankMsg::Send {
to_address: rcpt.clone().into(),
amount: coins(12, "eth"),
};
app.router
.execute(&mut cache2, &app.block, owner, msg.into())
.unwrap();

// shows up in 2nd cache
let cached_rcpt = query_router(&app.router, &cache, &rcpt);
assert_eq!(coins(25, "eth"), cached_rcpt);
let cached2_rcpt = query_router(&app.router, &cache2, &rcpt);
assert_eq!(coins(37, "eth"), cached2_rcpt);

// apply second to first
let ops = cache2.prepare();
ops.commit(&mut cache);
transactional(&mut cache, |cache2, read| {
let msg = BankMsg::Send {
to_address: rcpt.clone().into(),
amount: coins(12, "eth"),
};
app.router
.execute(cache2, &app.block, owner, msg.into())
.unwrap();

// shows up in 2nd cache
let cached_rcpt = query_router(&app.router, read, &rcpt);
assert_eq!(coins(25, "eth"), cached_rcpt);
let cached2_rcpt = query_router(&app.router, cache2, &rcpt);
assert_eq!(coins(37, "eth"), cached2_rcpt);
Ok(())
})
.unwrap();

// apply first to router
let ops = cache.prepare();
ops.commit(app.storage.as_mut());
cache.prepare().commit(app.storage.as_mut());

let committed = query_app(&app, &rcpt);
assert_eq!(coins(37, "eth"), committed);
Expand Down
32 changes: 10 additions & 22 deletions packages/multi-test/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@ use cosmwasm_std::{Order, Pair};
/// This is internal as it can change any time if the map implementation is swapped out.
type BTreeMapPairRef<'a, T = Vec<u8>> = (&'a Vec<u8>, &'a T);

pub fn transactional<F, T>(base: &mut dyn Storage, action: F) -> Result<T, String>
where
F: FnOnce(&mut dyn Storage, &dyn Storage) -> Result<T, String>,
{
let mut cache = StorageTransaction::new(base);
let res = action(&mut cache, base)?;
cache.prepare().commit(base);
Ok(res)
}

pub struct StorageTransaction<'a> {
/// read-only access to backing storage
storage: &'a dyn Storage,
Expand All @@ -35,19 +45,10 @@ impl<'a> StorageTransaction<'a> {
}
}

#[allow(dead_code)]
pub fn cache(&self) -> StorageTransaction {
StorageTransaction::new(self)
}

/// prepares this transaction to be committed to storage
pub fn prepare(self) -> RepLog {
self.rep_log
}

/// rollback will consume the checkpoint and drop all changes (not really needed, going out of scope does the same, but nice for clarity)
#[allow(dead_code)]
pub fn rollback(self) {}
}

impl<'a> Storage for StorageTransaction<'a> {
Expand Down Expand Up @@ -572,19 +573,6 @@ mod test {
assert_eq!(base.get(b"subtx"), Some(b"works".to_vec()));
}

#[test]
fn rollback_has_no_effect() {
let mut base = MemoryStorage::new();
base.set(b"foo", b"bar");

let mut check = StorageTransaction::new(&base);
assert_eq!(check.get(b"foo"), Some(b"bar".to_vec()));
check.set(b"subtx", b"works");
check.rollback();

assert_eq!(base.get(b"subtx"), None);
}

#[test]
fn ignore_same_as_rollback() {
let mut base = MemoryStorage::new();
Expand Down
174 changes: 92 additions & 82 deletions packages/multi-test/src/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use cw_storage_plus::Map;
use crate::app::{Router, RouterQuerier};
use crate::contracts::Contract;
use crate::executor::AppResponse;
use crate::transactions::StorageTransaction;
use crate::transactions::transactional;

// Contract state is kept in Storage, separate from the contracts themselves
const CONTRACTS: Map<&Addr, ContractData> = Map::new("contracts");
Expand Down Expand Up @@ -287,19 +287,21 @@ where
contract: Addr,
msg: SubMsg<C>,
) -> Result<AppResponse, String> {
let SubMsg {
msg, id, reply_on, ..
} = msg;

// execute in cache
let mut cache = StorageTransaction::new(storage);
let res = router.execute(&mut cache, block, contract.clone(), msg.msg);
if res.is_ok() {
cache.prepare().commit(storage);
}
let res = transactional(storage, |write_cache, _| {
router.execute(write_cache, block, contract.clone(), msg)
});

// call reply if meaningful
if let Ok(r) = res {
if matches!(msg.reply_on, ReplyOn::Always | ReplyOn::Success) {
if matches!(reply_on, ReplyOn::Always | ReplyOn::Success) {
let mut orig = r.clone();
let reply = Reply {
id: msg.id,
id,
result: ContractResult::Ok(SubMsgExecutionResponse {
events: r.events,
data: r.data,
Expand All @@ -318,9 +320,9 @@ where
Ok(r)
}
} else if let Err(e) = res {
if matches!(msg.reply_on, ReplyOn::Always | ReplyOn::Error) {
if matches!(reply_on, ReplyOn::Always | ReplyOn::Error) {
let reply = Reply {
id: msg.id,
id,
result: ContractResult::Err(e),
};
self._reply(router, storage, block, contract, reply)
Expand Down Expand Up @@ -513,23 +515,22 @@ where
.get(&contract.code_id)
.ok_or_else(|| "Unregistered code id".to_string())?;

let mut cache = StorageTransaction::new(storage);
let mut contract_storage = self.contract_storage(&mut cache, &address);
let querier = RouterQuerier::new(router, storage, block);
let env = self.get_env(address, block);

let deps = DepsMut {
storage: contract_storage.as_mut(),
api: self.api.deref(),
querier: QuerierWrapper::new(&querier),
};
let res = action(handler, deps, env)?;

// forces drop, so we can write to cache
drop(contract_storage);
// if this succeeds, commit
cache.prepare().commit(storage);
Ok(res)
// We don't actually need a transaction here, as it is already embedded in a transactional.
// execute_submsg or App.execute_multi.
// However, we need to get write and read access to the same storage in two different objects,
// and this is the only way I know how to do so.
transactional(storage, |write_cache, read_store| {
let mut contract_storage = self.contract_storage(write_cache, &address);
let querier = RouterQuerier::new(router, read_store, block);
let env = self.get_env(address, block);

let deps = DepsMut {
storage: contract_storage.as_mut(),
api: self.api.deref(),
querier: QuerierWrapper::new(&querier),
};
action(handler, deps, env)
})
}

fn load_contract(&self, storage: &dyn Storage, address: &Addr) -> Result<ContractData, String> {
Expand Down Expand Up @@ -816,68 +817,77 @@ mod test {
.unwrap();
cache.prepare().commit(&mut wasm_storage);

// create a new cache and check we can use contract 1
let mut cache = StorageTransaction::new(&wasm_storage);
assert_payout(&keeper, &mut cache, &contract1, &payout1);

// create contract 2 and use it
let contract2 = keeper.register_contract(&mut cache, code_id).unwrap();
let payout2 = coin(50, "BTC");
let info = mock_info("foobar", &[]);
let init_msg = to_vec(&PayoutInitMessage {
payout: payout2.clone(),
})
.unwrap();
let _res = keeper
.call_instantiate(
contract2.clone(),
&mut cache,
&mock_router(),
&block,
info,
init_msg,
)
.unwrap();
assert_payout(&keeper, &mut cache, &contract2, &payout2);

// create a level2 cache and check we can use contract 1 and contract 2
let mut cache2 = cache.cache();
assert_payout(&keeper, &mut cache2, &contract1, &payout1);
assert_payout(&keeper, &mut cache2, &contract2, &payout2);

// create a contract on level 2
let contract3 = keeper.register_contract(&mut cache2, code_id).unwrap();
let payout3 = coin(1234, "ATOM");
let info = mock_info("johnny", &[]);
let init_msg = to_vec(&PayoutInitMessage {
payout: payout3.clone(),
})
.unwrap();
let _res = keeper
.call_instantiate(
contract3.clone(),
&mut cache2,
&mock_router(),
&block,
info,
init_msg,
)

// create a new cache and check we can use contract 1
let (contract2, contract3) = transactional(&mut wasm_storage, |cache, wasm_reader| {
assert_payout(&keeper, cache, &contract1, &payout1);

// create contract 2 and use it
let contract2 = keeper.register_contract(cache, code_id).unwrap();
let info = mock_info("foobar", &[]);
let init_msg = to_vec(&PayoutInitMessage {
payout: payout2.clone(),
})
.unwrap();
let _res = keeper
.call_instantiate(
contract2.clone(),
cache,
&mock_router(),
&block,
info,
init_msg,
)
.unwrap();
assert_payout(&keeper, cache, &contract2, &payout2);

// create a level2 cache and check we can use contract 1 and contract 2
let contract3 = transactional(cache, |cache2, read| {
assert_payout(&keeper, cache2, &contract1, &payout1);
assert_payout(&keeper, cache2, &contract2, &payout2);

// create a contract on level 2
let contract3 = keeper.register_contract(cache2, code_id).unwrap();
let info = mock_info("johnny", &[]);
let init_msg = to_vec(&PayoutInitMessage {
payout: payout3.clone(),
})
.unwrap();
let _res = keeper
.call_instantiate(
contract3.clone(),
cache2,
&mock_router(),
&block,
info,
init_msg,
)
.unwrap();
assert_payout(&keeper, cache2, &contract3, &payout3);

// ensure first cache still doesn't see this contract
assert_no_contract(read, &contract3);
Ok(contract3)
})
.unwrap();
assert_payout(&keeper, &mut cache2, &contract3, &payout3);

// ensure first cache still doesn't see this contract
assert_no_contract(&cache, &contract3);
// after applying transaction, all contracts present on cache
assert_payout(&keeper, cache, &contract1, &payout1);
assert_payout(&keeper, cache, &contract2, &payout2);
assert_payout(&keeper, cache, &contract3, &payout3);

// apply second to first, all contracts present
cache2.prepare().commit(&mut cache);
assert_payout(&keeper, &mut cache, &contract1, &payout1);
assert_payout(&keeper, &mut cache, &contract2, &payout2);
assert_payout(&keeper, &mut cache, &contract3, &payout3);
// but not yet the root router
assert_no_contract(wasm_reader, &contract1);
assert_no_contract(wasm_reader, &contract2);
assert_no_contract(wasm_reader, &contract3);

// apply to router
cache.prepare().commit(&mut wasm_storage);
Ok((contract2, contract3))
})
.unwrap();

// make new cache and see all contracts there
// ensure that it is now applied to the router
assert_payout(&keeper, &mut wasm_storage, &contract1, &payout1);
assert_payout(&keeper, &mut wasm_storage, &contract2, &payout2);
assert_payout(&keeper, &mut wasm_storage, &contract3, &payout3);
Expand Down

0 comments on commit dae72a3

Please sign in to comment.