Skip to content

Commit

Permalink
prune messages from confirmation tx, not from the on_idle (#2211)
Browse files Browse the repository at this point in the history
  • Loading branch information
svyatonik authored and bkontur committed May 15, 2024
1 parent cba56d0 commit d39881d
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 335 deletions.
35 changes: 0 additions & 35 deletions bridges/modules/messages/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ use bp_runtime::{
};
use codec::{Decode, Encode, MaxEncodedLen};
use frame_support::{dispatch::PostDispatchInfo, ensure, fail, traits::Get, DefaultNoBound};
use sp_runtime::traits::UniqueSaturatedFrom;
use sp_std::{marker::PhantomData, prelude::*};

mod inbound_lane;
Expand Down Expand Up @@ -153,40 +152,6 @@ pub mod pallet {
type OperatingModeStorage = PalletOperatingMode<T, I>;
}

#[pallet::hooks]
impl<T: Config<I>, I: 'static> Hooks<BlockNumberFor<T>> for Pallet<T, I>
where
u32: TryFrom<BlockNumberFor<T>>,
{
fn on_idle(_block: BlockNumberFor<T>, remaining_weight: Weight) -> Weight {
// we'll need at least to read outbound lane state, kill a message and update lane state
let db_weight = T::DbWeight::get();
if !remaining_weight.all_gte(db_weight.reads_writes(1, 2)) {
return Weight::zero()
}

// messages from lane with index `i` in `ActiveOutboundLanes` are pruned when
// `System::block_number() % lanes.len() == i`. Otherwise we need to read lane states on
// every block, wasting the whole `remaining_weight` for nothing and causing starvation
// of the last lane pruning
let active_lanes = T::ActiveOutboundLanes::get();
let active_lanes_len = (active_lanes.len() as u32).into();
let active_lane_index = u32::unique_saturated_from(
frame_system::Pallet::<T>::block_number() % active_lanes_len,
);
let active_lane_id = active_lanes[active_lane_index as usize];

// first db read - outbound lane state
let mut active_lane = outbound_lane::<T, I>(active_lane_id);
let mut used_weight = db_weight.reads(1);
// and here we'll have writes
used_weight += active_lane.prune_messages(db_weight, remaining_weight - used_weight);

// we already checked we have enough `remaining_weight` to cover this `used_weight`
used_weight
}
}

#[pallet::call]
impl<T: Config<I>, I: 'static> Pallet<T, I> {
/// Change `PalletOwner`.
Expand Down
129 changes: 41 additions & 88 deletions bridges/modules/messages/src/outbound_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,7 @@ use bp_messages::{
ChainWithMessages, DeliveredMessages, LaneId, MessageNonce, OutboundLaneData, UnrewardedRelayer,
};
use codec::{Decode, Encode};
use frame_support::{
traits::Get,
weights::{RuntimeDbWeight, Weight},
BoundedVec, PalletError,
};
use num_traits::Zero;
use frame_support::{traits::Get, BoundedVec, PalletError};
use scale_info::TypeInfo;
use sp_runtime::RuntimeDebug;
use sp_std::{collections::vec_deque::VecDeque, marker::PhantomData};
Expand Down Expand Up @@ -144,41 +139,17 @@ impl<S: OutboundLaneStorage> OutboundLane<S> {

ensure_unrewarded_relayers_are_correct(confirmed_messages.end, relayers)?;

// prune all confirmed messages
for nonce in confirmed_messages.begin..=confirmed_messages.end {
self.storage.remove_message(&nonce);
}

data.latest_received_nonce = confirmed_messages.end;
data.oldest_unpruned_nonce = data.latest_received_nonce.saturating_add(1);
self.storage.set_data(data);

Ok(Some(confirmed_messages))
}

/// Prune at most `max_messages_to_prune` already received messages.
///
/// Returns weight, consumed by messages pruning and lane state update.
pub fn prune_messages(
&mut self,
db_weight: RuntimeDbWeight,
mut remaining_weight: Weight,
) -> Weight {
let write_weight = db_weight.writes(1);
let two_writes_weight = write_weight + write_weight;
let mut spent_weight = Weight::zero();
let mut data = self.storage.data();
while remaining_weight.all_gte(two_writes_weight) &&
data.oldest_unpruned_nonce <= data.latest_received_nonce
{
self.storage.remove_message(&data.oldest_unpruned_nonce);

spent_weight += write_weight;
remaining_weight -= write_weight;
data.oldest_unpruned_nonce += 1;
}

if !spent_weight.is_zero() {
spent_weight += write_weight;
self.storage.set_data(data);
}

spent_weight
}
}

/// Verifies unrewarded relayers vec.
Expand Down Expand Up @@ -222,7 +193,6 @@ mod tests {
REGULAR_PAYLOAD, TEST_LANE_ID,
},
};
use frame_support::weights::constants::RocksDbWeight;
use sp_std::ops::RangeInclusive;

fn unrewarded_relayers(
Expand Down Expand Up @@ -275,12 +245,43 @@ mod tests {
assert_eq!(lane.send_message(outbound_message_data(REGULAR_PAYLOAD)), 3);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_received_nonce, 0);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1);
assert_eq!(
lane.confirm_delivery(3, 3, &unrewarded_relayers(1..=3)),
Ok(Some(delivered_messages(1..=3))),
);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_received_nonce, 3);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 4);
});
}

#[test]
fn confirm_partial_delivery_works() {
run_test(|| {
let mut lane = outbound_lane::<TestRuntime, _>(TEST_LANE_ID);
assert_eq!(lane.send_message(outbound_message_data(REGULAR_PAYLOAD)), 1);
assert_eq!(lane.send_message(outbound_message_data(REGULAR_PAYLOAD)), 2);
assert_eq!(lane.send_message(outbound_message_data(REGULAR_PAYLOAD)), 3);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_received_nonce, 0);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1);

assert_eq!(
lane.confirm_delivery(3, 2, &unrewarded_relayers(1..=2)),
Ok(Some(delivered_messages(1..=2))),
);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_received_nonce, 2);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 3);

assert_eq!(
lane.confirm_delivery(3, 3, &unrewarded_relayers(3..=3)),
Ok(Some(delivered_messages(3..=3))),
);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_received_nonce, 3);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 4);
});
}

Expand All @@ -293,17 +294,20 @@ mod tests {
lane.send_message(outbound_message_data(REGULAR_PAYLOAD));
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_received_nonce, 0);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1);
assert_eq!(
lane.confirm_delivery(3, 3, &unrewarded_relayers(1..=3)),
Ok(Some(delivered_messages(1..=3))),
);
assert_eq!(lane.confirm_delivery(3, 3, &unrewarded_relayers(1..=3)), Ok(None),);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_received_nonce, 3);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 4);

assert_eq!(lane.confirm_delivery(1, 2, &unrewarded_relayers(1..=1)), Ok(None),);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_received_nonce, 3);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 4);
});
}

Expand Down Expand Up @@ -361,57 +365,6 @@ mod tests {
);
}

#[test]
fn prune_messages_works() {
run_test(|| {
let mut lane = outbound_lane::<TestRuntime, _>(TEST_LANE_ID);
// when lane is empty, nothing is pruned
assert_eq!(
lane.prune_messages(RocksDbWeight::get(), RocksDbWeight::get().writes(101)),
Weight::zero()
);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1);
// when nothing is confirmed, nothing is pruned
lane.send_message(outbound_message_data(REGULAR_PAYLOAD));
lane.send_message(outbound_message_data(REGULAR_PAYLOAD));
lane.send_message(outbound_message_data(REGULAR_PAYLOAD));
assert!(lane.storage.message(&1).is_some());
assert!(lane.storage.message(&2).is_some());
assert!(lane.storage.message(&3).is_some());
assert_eq!(
lane.prune_messages(RocksDbWeight::get(), RocksDbWeight::get().writes(101)),
Weight::zero()
);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1);
// after confirmation, some messages are received
assert_eq!(
lane.confirm_delivery(2, 2, &unrewarded_relayers(1..=2)),
Ok(Some(delivered_messages(1..=2))),
);
assert_eq!(
lane.prune_messages(RocksDbWeight::get(), RocksDbWeight::get().writes(101)),
RocksDbWeight::get().writes(3),
);
assert!(lane.storage.message(&1).is_none());
assert!(lane.storage.message(&2).is_none());
assert!(lane.storage.message(&3).is_some());
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 3);
// after last message is confirmed, everything is pruned
assert_eq!(
lane.confirm_delivery(1, 3, &unrewarded_relayers(3..=3)),
Ok(Some(delivered_messages(3..=3))),
);
assert_eq!(
lane.prune_messages(RocksDbWeight::get(), RocksDbWeight::get().writes(101)),
RocksDbWeight::get().writes(2),
);
assert!(lane.storage.message(&1).is_none());
assert!(lane.storage.message(&2).is_none());
assert!(lane.storage.message(&3).is_none());
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 4);
});
}

#[test]
fn confirm_delivery_detects_when_more_than_expected_messages_are_confirmed() {
run_test(|| {
Expand Down
124 changes: 0 additions & 124 deletions bridges/modules/messages/src/tests/pallet_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use frame_support::{
assert_noop, assert_ok,
dispatch::Pays,
storage::generator::{StorageMap, StorageValue},
traits::Hooks,
weights::Weight,
};
use frame_system::{EventRecord, Pallet as System, Phase};
Expand Down Expand Up @@ -834,129 +833,6 @@ fn inbound_message_details_works() {
});
}

#[test]
fn on_idle_callback_respects_remaining_weight() {
run_test(|| {
send_regular_message(TEST_LANE_ID);
send_regular_message(TEST_LANE_ID);
send_regular_message(TEST_LANE_ID);
send_regular_message(TEST_LANE_ID);

assert_ok!(Pallet::<TestRuntime>::receive_messages_delivery_proof(
RuntimeOrigin::signed(1),
prepare_messages_delivery_proof(
TEST_LANE_ID,
InboundLaneData {
last_confirmed_nonce: 4,
relayers: vec![unrewarded_relayer(1, 4, TEST_RELAYER_A)].into(),
},
),
UnrewardedRelayersState {
unrewarded_relayer_entries: 1,
messages_in_oldest_entry: 4,
total_messages: 4,
last_delivered_nonce: 4,
},
));

// all 4 messages may be pruned now
assert_eq!(outbound_lane::<TestRuntime, ()>(TEST_LANE_ID).data().latest_received_nonce, 4);
assert_eq!(outbound_lane::<TestRuntime, ()>(TEST_LANE_ID).data().oldest_unpruned_nonce, 1);
System::<TestRuntime>::set_block_number(2);

// if passed wight is too low to do anything
let dbw = DbWeight::get();
assert_eq!(Pallet::<TestRuntime, ()>::on_idle(0, dbw.reads_writes(1, 1)), Weight::zero(),);
assert_eq!(outbound_lane::<TestRuntime, ()>(TEST_LANE_ID).data().oldest_unpruned_nonce, 1);

// if passed wight is enough to prune single message
assert_eq!(
Pallet::<TestRuntime, ()>::on_idle(0, dbw.reads_writes(1, 2)),
dbw.reads_writes(1, 2),
);
assert_eq!(outbound_lane::<TestRuntime, ()>(TEST_LANE_ID).data().oldest_unpruned_nonce, 2);

// if passed wight is enough to prune two more messages
assert_eq!(
Pallet::<TestRuntime, ()>::on_idle(0, dbw.reads_writes(1, 3)),
dbw.reads_writes(1, 3),
);
assert_eq!(outbound_lane::<TestRuntime, ()>(TEST_LANE_ID).data().oldest_unpruned_nonce, 4);

// if passed wight is enough to prune many messages
assert_eq!(
Pallet::<TestRuntime, ()>::on_idle(0, dbw.reads_writes(100, 100)),
dbw.reads_writes(1, 2),
);
assert_eq!(outbound_lane::<TestRuntime, ()>(TEST_LANE_ID).data().oldest_unpruned_nonce, 5);
});
}

#[test]
fn on_idle_callback_is_rotating_lanes_to_prune() {
run_test(|| {
// send + receive confirmation for lane 1
send_regular_message(TEST_LANE_ID);
receive_messages_delivery_proof();
// send + receive confirmation for lane 2
send_regular_message(TEST_LANE_ID_2);
assert_ok!(Pallet::<TestRuntime>::receive_messages_delivery_proof(
RuntimeOrigin::signed(1),
prepare_messages_delivery_proof(
TEST_LANE_ID_2,
InboundLaneData {
last_confirmed_nonce: 1,
relayers: vec![unrewarded_relayer(1, 1, TEST_RELAYER_A)].into(),
},
),
UnrewardedRelayersState {
unrewarded_relayer_entries: 1,
messages_in_oldest_entry: 1,
total_messages: 1,
last_delivered_nonce: 1,
},
));

// nothing is pruned yet
assert_eq!(outbound_lane::<TestRuntime, ()>(TEST_LANE_ID).data().latest_received_nonce, 1);
assert_eq!(outbound_lane::<TestRuntime, ()>(TEST_LANE_ID).data().oldest_unpruned_nonce, 1);
assert_eq!(
outbound_lane::<TestRuntime, ()>(TEST_LANE_ID_2).data().latest_received_nonce,
1
);
assert_eq!(
outbound_lane::<TestRuntime, ()>(TEST_LANE_ID_2).data().oldest_unpruned_nonce,
1
);

// in block#2.on_idle lane messages of lane 1 are pruned
let dbw = DbWeight::get();
System::<TestRuntime>::set_block_number(2);
assert_eq!(
Pallet::<TestRuntime, ()>::on_idle(0, dbw.reads_writes(100, 100)),
dbw.reads_writes(1, 2),
);
assert_eq!(outbound_lane::<TestRuntime, ()>(TEST_LANE_ID).data().oldest_unpruned_nonce, 2);
assert_eq!(
outbound_lane::<TestRuntime, ()>(TEST_LANE_ID_2).data().oldest_unpruned_nonce,
1
);

// in block#3.on_idle lane messages of lane 2 are pruned
System::<TestRuntime>::set_block_number(3);

assert_eq!(
Pallet::<TestRuntime, ()>::on_idle(0, dbw.reads_writes(100, 100)),
dbw.reads_writes(1, 2),
);
assert_eq!(outbound_lane::<TestRuntime, ()>(TEST_LANE_ID).data().oldest_unpruned_nonce, 2);
assert_eq!(
outbound_lane::<TestRuntime, ()>(TEST_LANE_ID_2).data().oldest_unpruned_nonce,
2
);
});
}

#[test]
fn outbound_message_from_unconfigured_lane_is_rejected() {
run_test(|| {
Expand Down
Loading

0 comments on commit d39881d

Please sign in to comment.