Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reward relayers for dispatching messages #385

Merged
merged 16 commits into from
Oct 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
268 changes: 262 additions & 6 deletions modules/message-lane/src/inbound_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,25 @@

use bp_message_lane::{
target_chain::{DispatchMessage, DispatchMessageData, MessageDispatch},
InboundLaneData, LaneId, MessageKey, MessageNonce,
InboundLaneData, LaneId, MessageKey, MessageNonce, OutboundLaneData,
};
use sp_std::prelude::PartialEq;

/// Inbound lane storage.
pub trait InboundLaneStorage {
/// Delivery and dispatch fee type on source chain.
type MessageFee;
/// Id of relayer on source chain.
type Relayer: PartialEq;

/// Lane id.
fn id(&self) -> LaneId;
/// Return maximal number of unconfirmed messages in inbound lane.
fn max_unconfirmed_messages(&self) -> MessageNonce;
/// Get lane data from the storage.
fn data(&self) -> InboundLaneData;
fn data(&self) -> InboundLaneData<Self::Relayer>;
/// Update lane data in the storage.
fn set_data(&mut self, data: InboundLaneData);
fn set_data(&mut self, data: InboundLaneData<Self::Relayer>);
}

/// Inbound messages lane.
Expand All @@ -45,9 +50,44 @@ impl<S: InboundLaneStorage> InboundLane<S> {
InboundLane { storage }
}

/// Receive state of the corresponding outbound lane.
pub fn receive_state_update(&mut self, outbound_lane_data: OutboundLaneData) -> Option<MessageNonce> {
let mut data = self.storage.data();
if outbound_lane_data.latest_received_nonce > data.latest_received_nonce {
// this is something that should never happen if proofs are correct
return None;
}
if outbound_lane_data.latest_received_nonce <= data.latest_confirmed_nonce {
return None;
}

data.latest_confirmed_nonce = outbound_lane_data.latest_received_nonce;
// Firstly, remove all of the records where higher nonce <= new confirmed nonce
while data
.relayers
.front()
.map(|(_, nonce_high, _)| *nonce_high <= data.latest_confirmed_nonce)
.unwrap_or(false)
{
data.relayers.pop_front();
}
// Secondly, update the next record with lower nonce equal to new confirmed nonce if needed.
// Note: There will be max. 1 record to update as we don't allow messages from relayers to overlap.
match data.relayers.front_mut() {
Some((nonce_low, _, _)) if *nonce_low < data.latest_confirmed_nonce => {
*nonce_low = data.latest_confirmed_nonce + 1;
}
_ => {}
}

self.storage.set_data(data);
Some(outbound_lane_data.latest_received_nonce)
}

/// Receive new message.
pub fn receive_message<P: MessageDispatch<S::MessageFee>>(
&mut self,
relayer: S::Relayer,
nonce: MessageNonce,
message_data: DispatchMessageData<P::DispatchPayload, S::MessageFee>,
) -> bool {
Expand All @@ -57,7 +97,24 @@ impl<S: InboundLaneStorage> InboundLane<S> {
return false;
}

// if there are more unconfirmed messages than we may accept, reject this message
if self.storage.max_unconfirmed_messages() <= data.relayers.len() as MessageNonce {
return false;
}

data.latest_received_nonce = nonce;

let push_new = match data.relayers.back_mut() {
Some((_, nonce_high, last_relayer)) if last_relayer == &relayer => {
*nonce_high = nonce;
false
}
_ => true,
};
if push_new {
data.relayers.push_back((nonce, nonce, relayer));
}

self.storage.set_data(data);

P::dispatch(DispatchMessage {
Expand All @@ -77,23 +134,222 @@ mod tests {
use super::*;
use crate::{
inbound_lane,
mock::{message_data, run_test, TestMessageDispatch, TestRuntime, REGULAR_PAYLOAD, TEST_LANE_ID},
mock::{
message_data, run_test, TestMessageDispatch, TestRuntime, REGULAR_PAYLOAD, TEST_LANE_ID, TEST_RELAYER_A,
TEST_RELAYER_B, TEST_RELAYER_C,
},
DefaultInstance, RuntimeInboundLaneStorage,
};

fn receive_regular_message(
lane: &mut InboundLane<RuntimeInboundLaneStorage<TestRuntime, DefaultInstance>>,
nonce: MessageNonce,
) {
assert!(lane.receive_message::<TestMessageDispatch>(
TEST_RELAYER_A,
nonce,
message_data(REGULAR_PAYLOAD).into()
));
}

#[test]
fn receive_status_update_ignores_status_from_the_future() {
run_test(|| {
let mut lane = inbound_lane::<TestRuntime, _>(TEST_LANE_ID);
receive_regular_message(&mut lane, 1);
assert_eq!(
lane.receive_state_update(OutboundLaneData {
latest_received_nonce: 10,
..Default::default()
}),
None,
);

assert_eq!(lane.storage.data().latest_confirmed_nonce, 0);
});
}

#[test]
fn receive_status_update_ignores_obsolete_status() {
run_test(|| {
let mut lane = inbound_lane::<TestRuntime, _>(TEST_LANE_ID);
receive_regular_message(&mut lane, 1);
receive_regular_message(&mut lane, 2);
receive_regular_message(&mut lane, 3);
assert_eq!(
lane.receive_state_update(OutboundLaneData {
latest_received_nonce: 3,
..Default::default()
}),
Some(3),
);
assert_eq!(lane.storage.data().latest_confirmed_nonce, 3);

assert_eq!(
lane.receive_state_update(OutboundLaneData {
latest_received_nonce: 3,
..Default::default()
}),
None,
);
assert_eq!(lane.storage.data().latest_confirmed_nonce, 3);
});
}

#[test]
fn receive_status_update_works() {
run_test(|| {
let mut lane = inbound_lane::<TestRuntime, _>(TEST_LANE_ID);
receive_regular_message(&mut lane, 1);
receive_regular_message(&mut lane, 2);
receive_regular_message(&mut lane, 3);
assert_eq!(lane.storage.data().latest_confirmed_nonce, 0);
assert_eq!(lane.storage.data().relayers, vec![(1, 3, TEST_RELAYER_A)]);

assert_eq!(
lane.receive_state_update(OutboundLaneData {
latest_received_nonce: 2,
..Default::default()
}),
Some(2),
);
assert_eq!(lane.storage.data().latest_confirmed_nonce, 2);
assert_eq!(lane.storage.data().relayers, vec![(3, 3, TEST_RELAYER_A)]);

assert_eq!(
lane.receive_state_update(OutboundLaneData {
latest_received_nonce: 3,
..Default::default()
}),
Some(3),
);
assert_eq!(lane.storage.data().latest_confirmed_nonce, 3);
assert_eq!(lane.storage.data().relayers, vec![]);
});
}

#[test]
fn receive_status_update_works_with_batches_from_relayers() {
run_test(|| {
let mut lane = inbound_lane::<TestRuntime, _>(TEST_LANE_ID);
let mut seed_storage_data = lane.storage.data();
// Prepare data
seed_storage_data.latest_confirmed_nonce = 0;
seed_storage_data.latest_received_nonce = 5;
seed_storage_data.relayers.push_back((1, 1, TEST_RELAYER_A));
// Simulate messages batch (2, 3, 4) from relayer #2
seed_storage_data.relayers.push_back((2, 4, TEST_RELAYER_B));
seed_storage_data.relayers.push_back((5, 5, TEST_RELAYER_C));
lane.storage.set_data(seed_storage_data);
// Check
assert_eq!(
lane.receive_state_update(OutboundLaneData {
latest_received_nonce: 3,
..Default::default()
}),
Some(3),
);
assert_eq!(lane.storage.data().latest_confirmed_nonce, 3);
assert_eq!(
lane.storage.data().relayers,
vec![(4, 4, TEST_RELAYER_B), (5, 5, TEST_RELAYER_C)]
);
});
}

#[test]
fn fails_to_receive_message_with_incorrect_nonce() {
run_test(|| {
let mut lane = inbound_lane::<TestRuntime, _>(TEST_LANE_ID);
assert!(!lane.receive_message::<TestMessageDispatch>(10, message_data(REGULAR_PAYLOAD).into()));
assert!(!lane.receive_message::<TestMessageDispatch>(
TEST_RELAYER_A,
10,
message_data(REGULAR_PAYLOAD).into()
));
assert_eq!(lane.storage.data().latest_received_nonce, 0);
});
}

#[test]
fn fails_to_receive_messages_above_max_limit_per_lane() {
run_test(|| {
let mut lane = inbound_lane::<TestRuntime, _>(TEST_LANE_ID);
let max_nonce = <TestRuntime as crate::Trait>::MaxUnconfirmedMessagesAtInboundLane::get();
for current_nonce in 1..max_nonce + 1 {
assert!(lane.receive_message::<TestMessageDispatch>(
TEST_RELAYER_A + current_nonce,
current_nonce,
message_data(REGULAR_PAYLOAD).into()
));
}
// Fails to dispatch new message from different than latest relayer.
assert_eq!(
false,
lane.receive_message::<TestMessageDispatch>(
TEST_RELAYER_A + max_nonce + 1,
max_nonce + 1,
message_data(REGULAR_PAYLOAD).into()
)
);
// Fails to dispatch new messages from latest relayer. Prevents griefing attacks.
assert_eq!(
false,
lane.receive_message::<TestMessageDispatch>(
TEST_RELAYER_A + max_nonce,
max_nonce + 1,
message_data(REGULAR_PAYLOAD).into()
)
);
});
}

#[test]
fn correctly_receives_following_messages_from_two_relayers_alternately() {
run_test(|| {
let mut lane = inbound_lane::<TestRuntime, _>(TEST_LANE_ID);
assert!(lane.receive_message::<TestMessageDispatch>(
TEST_RELAYER_A,
1,
message_data(REGULAR_PAYLOAD).into()
));
assert!(lane.receive_message::<TestMessageDispatch>(
TEST_RELAYER_B,
2,
message_data(REGULAR_PAYLOAD).into()
));
assert!(lane.receive_message::<TestMessageDispatch>(
TEST_RELAYER_A,
3,
message_data(REGULAR_PAYLOAD).into()
));
assert_eq!(
lane.storage.data().relayers,
vec![(1, 1, TEST_RELAYER_A), (2, 2, TEST_RELAYER_B), (3, 3, TEST_RELAYER_A)]
);
});
}

#[test]
fn rejects_same_message_from_two_different_relayers() {
run_test(|| {
let mut lane = inbound_lane::<TestRuntime, _>(TEST_LANE_ID);
assert!(lane.receive_message::<TestMessageDispatch>(
TEST_RELAYER_A,
1,
message_data(REGULAR_PAYLOAD).into()
));
assert_eq!(
false,
lane.receive_message::<TestMessageDispatch>(TEST_RELAYER_B, 1, message_data(REGULAR_PAYLOAD).into())
);
});
}

#[test]
fn correct_message_is_processed_instantly() {
run_test(|| {
let mut lane = inbound_lane::<TestRuntime, _>(TEST_LANE_ID);
assert!(lane.receive_message::<TestMessageDispatch>(1, message_data(REGULAR_PAYLOAD).into()));
receive_regular_message(&mut lane, 1);
assert_eq!(lane.storage.data().latest_received_nonce, 1);
});
}
Expand Down
Loading