Skip to content

Commit

Permalink
Problem: required_signatures is static
Browse files Browse the repository at this point in the history
Validators' information is completely configured through validators
contracts and does not depend on `authorities.required_signatures`
parameter of bridge's configuration.

The number of validators also could be changed during run-time and
therefore `authorities.required_signatures` parameter will not reflect
the actual number of signatures required for transaction validation.

Solution: retrieve required_signatures from RequiredSignaturesChanged
event and requiredSignatures() method

Closes omni#74
  • Loading branch information
yrashk committed May 19, 2018
1 parent cc4147c commit 255eac7
Show file tree
Hide file tree
Showing 13 changed files with 167 additions and 51 deletions.
17 changes: 17 additions & 0 deletions bridge/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,23 @@ pub fn call<T: Transport>(transport: T, address: Address, payload: Bytes) -> Api
}
}

pub fn call_at<T: Transport>(transport: T, address: Address, payload: Bytes, block: Option<BlockNumber>) -> ApiCall<Bytes, T::Out> {
let future = api::Eth::new(transport).call(CallRequest {
from: None,
to: address,
gas: None,
gas_price: None,
value: None,
data: Some(payload),
}, block);

ApiCall {
future,
message: "eth_call",
}
}


/// Returns a eth_sign-compatible hash of data to sign.
/// The data is prepended with special message to prevent
/// chosen-plaintext attacks.
Expand Down
1 change: 1 addition & 0 deletions bridge/src/bridge/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ impl<T: Transport + Clone> Future for Deploy<T> {
checked_deposit_relay: main_receipt.block_number.low_u64(),
checked_withdraw_relay: test_receipt.block_number.low_u64(),
checked_withdraw_confirm: test_receipt.block_number.low_u64(),
withdraw_relay_required_signatures: Some(self.app.config.authorities.required_signatures),
};
return Ok(Deployed::New(database).into())
},
Expand Down
2 changes: 1 addition & 1 deletion bridge/src/bridge/deposit_relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use itertools::Itertools;

fn deposits_filter(home: &home::HomeBridge, address: Address) -> FilterBuilder {
let filter = home.events().deposit().create_filter();
web3_filter(filter, address)
web3_filter(filter, ::std::iter::once(address))
}

fn deposit_relay_payload(home: &home::HomeBridge, foreign: &foreign::ForeignBridge, log: Log) -> Result<Bytes> {
Expand Down
21 changes: 13 additions & 8 deletions bridge/src/bridge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::sync::{Arc, RwLock};
use std::path::PathBuf;
use futures::{Stream, Poll, Async};
use web3::Transport;
use web3::types::U256;
use web3::types::{U256, Address};
use app::App;
use database::Database;
use error::{Error, ErrorKind, Result};
Expand All @@ -27,7 +27,7 @@ pub use self::withdraw_confirm::{WithdrawConfirm, create_withdraw_confirm};
#[derive(Clone, Copy)]
pub enum BridgeChecked {
DepositRelay(u64),
WithdrawRelay(u64),
WithdrawRelay((u64, u32)),
WithdrawConfirm(u64),
}

Expand All @@ -47,8 +47,9 @@ impl BridgeBackend for FileBackend {
BridgeChecked::DepositRelay(n) => {
self.database.checked_deposit_relay = n;
},
BridgeChecked::WithdrawRelay(n) => {
BridgeChecked::WithdrawRelay((n, sigs)) => {
self.database.checked_withdraw_relay = n;
self.database.withdraw_relay_required_signatures = Some(sigs);
},
BridgeChecked::WithdrawConfirm(n) => {
self.database.checked_withdraw_confirm = n;
Expand All @@ -71,17 +72,18 @@ enum BridgeStatus {
}

/// Creates new bridge.
pub fn create_bridge<T: Transport + Clone>(app: Arc<App<T>>, init: &Database, home_chain_id: u64, foreign_chain_id: u64) -> Bridge<T, FileBackend> {
pub fn create_bridge<T: Transport + Clone>(app: Arc<App<T>>, init: &Database, home_chain_id: u64, foreign_chain_id: u64, foreign_validator_contract: Address) -> Bridge<T, FileBackend> {
let backend = FileBackend {
path: app.database_path.clone(),
database: init.clone(),
};

create_bridge_backed_by(app, init, backend, home_chain_id, foreign_chain_id)
create_bridge_backed_by(app, init, backend, home_chain_id, foreign_chain_id, foreign_validator_contract)
}

/// Creates new bridge writing to custom backend.
pub fn create_bridge_backed_by<T: Transport + Clone, F: BridgeBackend>(app: Arc<App<T>>, init: &Database, backend: F, home_chain_id: u64, foreign_chain_id: u64) -> Bridge<T, F> {
pub fn create_bridge_backed_by<T: Transport + Clone, F: BridgeBackend>(app: Arc<App<T>>, init: &Database, backend: F, home_chain_id: u64, foreign_chain_id: u64,
foreign_validator_contract: Address) -> Bridge<T, F> {
let home_balance = Arc::new(RwLock::new(None));
let foreign_balance = Arc::new(RwLock::new(None));
Bridge {
Expand All @@ -90,7 +92,7 @@ pub fn create_bridge_backed_by<T: Transport + Clone, F: BridgeBackend>(app: Arc<
foreign_balance: foreign_balance.clone(),
home_balance: home_balance.clone(),
deposit_relay: create_deposit_relay(app.clone(), init, foreign_balance.clone(), foreign_chain_id),
withdraw_relay: create_withdraw_relay(app.clone(), init, home_balance.clone(), home_chain_id),
withdraw_relay: create_withdraw_relay(app.clone(), init, home_balance.clone(), home_chain_id, foreign_validator_contract),
withdraw_confirm: create_withdraw_confirm(app.clone(), init, foreign_balance.clone(), foreign_chain_id),
state: BridgeStatus::Wait,
backend,
Expand Down Expand Up @@ -168,13 +170,15 @@ impl<T: Transport, F: BridgeBackend> Stream for Bridge<T, F> {
self.check_balances()?;
}


let w_relay = try_bridge!(self.withdraw_relay.poll().map_err(|e| ErrorKind::ContextualizedError(Box::new(e), "withdraw_relay"))).
map(BridgeChecked::WithdrawRelay);

if w_relay.is_some() {
self.check_balances()?;
}


let w_confirm = try_bridge!(self.withdraw_confirm.poll().map_err(|e| ErrorKind::ContextualizedError(Box::new(e), "withdraw_confirm"))).
map(BridgeChecked::WithdrawConfirm);

Expand Down Expand Up @@ -226,10 +230,11 @@ mod tests {
assert_eq!(1, backend.database.checked_deposit_relay);
assert_eq!(0, backend.database.checked_withdraw_confirm);
assert_eq!(0, backend.database.checked_withdraw_relay);
backend.save(vec![BridgeChecked::DepositRelay(2), BridgeChecked::WithdrawConfirm(3), BridgeChecked::WithdrawRelay(2)]).unwrap();
backend.save(vec![BridgeChecked::DepositRelay(2), BridgeChecked::WithdrawConfirm(3), BridgeChecked::WithdrawRelay((2, 1))]).unwrap();
assert_eq!(2, backend.database.checked_deposit_relay);
assert_eq!(3, backend.database.checked_withdraw_confirm);
assert_eq!(2, backend.database.checked_withdraw_relay);
assert_eq!(1, backend.database.withdraw_relay_required_signatures.unwrap());

let loaded = Database::load(path).unwrap();
assert_eq!(backend.database, loaded);
Expand Down
2 changes: 1 addition & 1 deletion bridge/src/bridge/withdraw_confirm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use super::nonce::{NonceCheck, SendRawTransaction};

fn withdraws_filter(foreign: &foreign::ForeignBridge, address: Address) -> FilterBuilder {
let filter = foreign.events().withdraw().create_filter();
web3_filter(filter, address)
web3_filter(filter, ::std::iter::once(address))
}

fn withdraw_submit_signature_payload(foreign: &foreign::ForeignBridge, withdraw_message: Vec<u8>, signature: H520) -> Bytes {
Expand Down
129 changes: 99 additions & 30 deletions bridge/src/bridge/withdraw_relay.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::sync::{Arc, RwLock};
use futures::{self, Future, Stream, stream::{Collect, iter_ok, IterOk, Buffered}, Poll};
use futures::{self, Async, Future, Stream, stream::{Collect, iter_ok, IterOk, Buffered}, Poll};
use futures::future::{JoinAll, join_all, Join};
use tokio_timer::Timeout;
use web3::Transport;
use web3::types::{U256, Address, FilterBuilder, Log, Bytes};
use ethabi::{RawLog, self};
use ethabi::{RawLog, Topic, self};
use app::App;
use api::{self, LogStream, ApiCall};
use contracts::foreign;
Expand All @@ -18,9 +18,18 @@ use super::nonce::{NonceCheck, SendRawTransaction};
use itertools::Itertools;

/// returns a filter for `ForeignBridge.CollectedSignatures` events
fn collected_signatures_filter(foreign: &foreign::ForeignBridge, address: Address) -> FilterBuilder {
let filter = foreign.events().collected_signatures().create_filter();
web3_filter(filter, address)
fn collected_signatures_filter<I: IntoIterator<Item = Address>>(foreign: &foreign::ForeignBridge, addresses: I) -> FilterBuilder {
let mut filter = foreign.events().collected_signatures().create_filter();
let sig_filter = foreign.events().required_signatures_changed().create_filter();
// Combine with the `RequiredSignaturesChanged` event
match filter.topic0 {
Topic::This(t) => filter.topic0 = Topic::OneOf(vec![t]),
Topic::OneOf(ref mut vec) => {
vec.append(&mut sig_filter.topic0.into());
},
_ => (),
}
web3_filter(filter, addresses)
}

/// payloads for calls to `ForeignBridge.signature` and `ForeignBridge.message`
Expand All @@ -33,7 +42,12 @@ struct RelayAssignment {
message_payload: Bytes,
}

fn signatures_payload(foreign: &foreign::ForeignBridge, required_signatures: u32, my_address: Address, log: Log) -> error::Result<Option<RelayAssignment>> {
fn signatures_payload(foreign: &foreign::ForeignBridge, required_signatures: u32, my_address: Address, log: Log) -> error::Result<(Option<RelayAssignment>, u32)> {
// check if this is a RequiredSignaturesChanged event
match get_required_signatures(foreign, log.clone()) {
Some(signatures) => return Ok((None, signatures)),
None => (),
}
// convert web3::Log to ethabi::RawLog since ethabi events can
// only be parsed from the latter
let raw_log = RawLog {
Expand All @@ -45,22 +59,35 @@ fn signatures_payload(foreign: &foreign::ForeignBridge, required_signatures: u32
info!("bridge not responsible for relaying transaction to home. tx hash: {}", log.transaction_hash.unwrap());
// this authority is not responsible for relaying this transaction.
// someone else will relay this transaction to home.
return Ok(None);
return Ok((None, required_signatures));
}
let signature_payloads = (0..required_signatures).into_iter()
.map(|index| foreign.functions().signature().input(collected_signatures.message_hash, index))
.map(Into::into)
.collect();
let message_payload = foreign.functions().message().input(collected_signatures.message_hash).into();

Ok(Some(RelayAssignment {
Ok((Some(RelayAssignment {
signature_payloads,
message_payload,
}))
}), required_signatures))
}

fn get_required_signatures(foreign: &foreign::ForeignBridge, log: Log) -> Option<u32> {
// convert web3::Log to ethabi::RawLog since ethabi events can
// only be parsed from the latter
let raw_log = RawLog {
topics: log.topics.into_iter().map(|t| t.0.into()).collect(),
data: log.data.0,
};
foreign.events().required_signatures_changed().parse_log(raw_log)
.ok().map(|v| v.required_signatures.low_u32())
}


/// state of the withdraw relay state machine
pub enum WithdrawRelayState<T: Transport> {
CheckRequiredSignatures(Timeout<ApiCall<Bytes, T::Out>>),
Wait,
FetchMessagesSignatures {
future: Join<
Expand All @@ -76,20 +103,31 @@ pub enum WithdrawRelayState<T: Transport> {
Yield(Option<u64>),
}

pub fn create_withdraw_relay<T: Transport + Clone>(app: Arc<App<T>>, init: &Database, home_balance: Arc<RwLock<Option<U256>>>, home_chain_id: u64) -> WithdrawRelay<T> {
pub fn create_withdraw_relay<T: Transport + Clone>(app: Arc<App<T>>, init: &Database, home_balance: Arc<RwLock<Option<U256>>>, home_chain_id: u64,
foreign_validator_contract: Address) -> WithdrawRelay<T> {
let logs_init = api::LogStreamInit {
after: init.checked_withdraw_relay,
request_timeout: app.config.foreign.request_timeout,
poll_interval: app.config.foreign.poll_interval,
confirmations: app.config.foreign.required_confirmations,
filter: collected_signatures_filter(&app.foreign_bridge, init.foreign_contract_address),
filter: collected_signatures_filter(&app.foreign_bridge, vec![init.foreign_contract_address, foreign_validator_contract]),
};

let state = if init.withdraw_relay_required_signatures.is_none() {
let call = app.timer.timeout(api::call_at(app.connections.foreign.clone(), foreign_validator_contract,
app.foreign_bridge.functions().required_signatures().input().into(),
Some(init.checked_withdraw_relay.into())), app.config.foreign.request_timeout);
WithdrawRelayState::CheckRequiredSignatures(call)
} else {
WithdrawRelayState::Wait
};

WithdrawRelay {
logs: api::log_stream(app.connections.foreign.clone(), app.timer.clone(), logs_init),
home_contract: init.home_contract_address,
foreign_contract: init.foreign_contract_address,
state: WithdrawRelayState::Wait,
required_signatures: init.withdraw_relay_required_signatures.clone().unwrap_or(app.config.authorities.accounts.len() as u32),
state,
app,
home_balance,
home_chain_id,
Expand All @@ -104,10 +142,11 @@ pub struct WithdrawRelay<T: Transport> {
home_contract: Address,
home_balance: Arc<RwLock<Option<U256>>>,
home_chain_id: u64,
required_signatures: u32,
}

impl<T: Transport> Stream for WithdrawRelay<T> {
type Item = u64;
type Item = (u64, u32);
type Error = Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
Expand All @@ -116,24 +155,54 @@ impl<T: Transport> Stream for WithdrawRelay<T> {
let contract = self.home_contract.clone();
let home = &self.app.config.home;
let t = &self.app.connections.home;
let foreign = &self.app.connections.foreign;
let chain_id = self.home_chain_id;
let foreign_bridge = &self.app.foreign_bridge;
let foreign_account = self.app.config.foreign.account;
let timer = &self.app.timer;
let foreign_contract = self.foreign_contract;
let foreign_request_timeout = self.app.config.foreign.request_timeout;

loop {
let required_signatures = self.required_signatures;
let next_state = match self.state {
WithdrawRelayState::CheckRequiredSignatures(ref mut logs) => {
let mut required_signatures = try_ready!(logs.poll().map_err(|e| ErrorKind::ContextualizedError(Box::new(e), "checking foreign for requiredSignatures value")));
self.required_signatures = U256::from(required_signatures.0.as_slice()).low_u32();
info!("Required signatures: {}", self.required_signatures);
WithdrawRelayState::Wait
},
WithdrawRelayState::Wait => {
let item = try_stream!(self.logs.poll().map_err(|e| ErrorKind::ContextualizedError(Box::new(e), "polling foreign for collected signatures")));
info!("got {} new signed withdraws to relay", item.logs.len());
let assignments = item.logs
.into_iter()
.map(|log| {
.fold((self.required_signatures, vec![]), |mut acc, log| {
info!("collected signature is ready for relay: tx hash: {}", log.transaction_hash.unwrap());
signatures_payload(
&self.app.foreign_bridge,
self.app.config.authorities.required_signatures,
self.app.config.foreign.account,
log)
})
.collect::<error::Result<Vec<_>>>()?;
let res = signatures_payload(
foreign_bridge,
acc.0,
foreign_account,
log);
match res {
Ok((value, required_signatures)) => {
acc.1.push(Ok(value));
(required_signatures, acc.1)
},
Err(err) => {
acc.1.push(Err(err));
(acc.0, acc.1)
},
}
});


if assignments.0 != self.required_signatures {
self.required_signatures = assignments.0;
info!("Required signatures: {} (block #{})", self.required_signatures, item.to);
}

let assignments = assignments.1.into_iter().collect::<error::Result<Vec<_>>>()?;

let (signatures, messages): (Vec<_>, Vec<_>) = assignments.into_iter()
.filter_map(|a| a)
Expand All @@ -142,19 +211,19 @@ impl<T: Transport> Stream for WithdrawRelay<T> {

let message_calls = messages.into_iter()
.map(|payload| {
self.app.timer.timeout(
api::call(&self.app.connections.foreign, self.foreign_contract.clone(), payload),
self.app.config.foreign.request_timeout)
timer.timeout(
api::call(foreign, foreign_contract.clone(), payload),
foreign_request_timeout)
})
.collect::<Vec<_>>();

let signature_calls = signatures.into_iter()
.map(|payloads| {
payloads.into_iter()
.map(|payload| {
self.app.timer.timeout(
api::call(&self.app.connections.foreign, self.foreign_contract.clone(), payload),
self.app.config.foreign.request_timeout)
timer.timeout(
api::call(foreign, foreign_contract.clone(), payload),
foreign_request_timeout)
})
.collect::<Vec<_>>()
})
Expand Down Expand Up @@ -246,7 +315,7 @@ impl<T: Transport> Stream for WithdrawRelay<T> {
info!("waiting for signed withdraws to relay");
WithdrawRelayState::Wait
},
some => return Ok(some.into()),
Some(block) => return Ok(Async::Ready(Some((block, required_signatures)))),
}
};
self.state = next_state;
Expand Down Expand Up @@ -282,7 +351,7 @@ mod tests {
removed: None,
};

let assignment = signatures_payload(&foreign, 2, my_address, log).unwrap().unwrap();
let assignment = signatures_payload(&foreign, 2, my_address, log).unwrap().0.unwrap();
let expected_message: Bytes = "490a32c600000000000000000000000000000000000000000000000000000000000000f0".from_hex().unwrap().into();
let expected_signatures: Vec<Bytes> = vec![
"1812d99600000000000000000000000000000000000000000000000000000000000000f00000000000000000000000000000000000000000000000000000000000000000".from_hex().unwrap().into(),
Expand Down Expand Up @@ -314,6 +383,6 @@ mod tests {
};

let assignment = signatures_payload(&foreign, 2, my_address, log).unwrap();
assert_eq!(None, assignment);
assert_eq!(None, assignment.0);
}
}
Loading

0 comments on commit 255eac7

Please sign in to comment.