diff --git a/runtime/parachains/src/mock.rs b/runtime/parachains/src/mock.rs index fc45eed0d1a5..88e241426d62 100644 --- a/runtime/parachains/src/mock.rs +++ b/runtime/parachains/src/mock.rs @@ -18,12 +18,15 @@ use crate::{ configuration, disputes, dmp, hrmp, inclusion, initializer, paras, paras_inherent, scheduler, - session_info, shared, ump, + session_info, shared, + ump::{self, MessageId, UmpSink}, + ParaId, }; -use frame_support::{parameter_types, traits::GenesisBuild}; +use frame_support::{parameter_types, traits::GenesisBuild, weights::Weight}; use frame_support_test::TestRandomness; +use parity_scale_codec::Decode; use primitives::v1::{ - AuthorityDiscoveryId, Balance, BlockNumber, Header, SessionIndex, ValidatorIndex, + AuthorityDiscoveryId, Balance, BlockNumber, Header, SessionIndex, UpwardMessage, ValidatorIndex, }; use sp_core::H256; use sp_io::TestExternalities; @@ -128,7 +131,7 @@ parameter_types! { impl crate::ump::Config for Test { type Event = Event; - type UmpSink = crate::ump::mock_sink::MockUmpSink; + type UmpSink = TestUmpSink; type FirstMessageFactorPercent = FirstMessageFactorPercent; } @@ -232,6 +235,41 @@ pub fn availability_rewards() -> HashMap { AVAILABILITY_REWARDS.with(|r| r.borrow().clone()) } +std::thread_local! { + static PROCESSED: RefCell> = RefCell::new(vec![]); +} + +/// Return which messages have been processed by `pocess_upward_message` and clear the buffer. +pub fn take_processed() -> Vec<(ParaId, UpwardMessage)> { + PROCESSED.with(|opt_hook| std::mem::take(&mut *opt_hook.borrow_mut())) +} + +/// An implementation of a UMP sink that just records which messages were processed. +/// +/// A message's weight is defined by the first 4 bytes of its data, which we decode into a +/// `u32`. +pub struct TestUmpSink; +impl UmpSink for TestUmpSink { + fn process_upward_message( + actual_origin: ParaId, + actual_msg: &[u8], + max_weight: Weight, + ) -> Result { + let weight = match u32::decode(&mut &actual_msg[..]) { + Ok(w) => w as Weight, + Err(_) => return Ok(0), // same as the real `UmpSink` + }; + if weight > max_weight { + let id = sp_io::hashing::blake2_256(actual_msg); + return Err((id, weight)) + } + PROCESSED.with(|opt_hook| { + opt_hook.borrow_mut().push((actual_origin, actual_msg.to_owned())); + }); + Ok(weight) + } +} + pub struct TestRewardValidators; impl inclusion::RewardValidators for TestRewardValidators { diff --git a/runtime/parachains/src/ump.rs b/runtime/parachains/src/ump.rs index 40f1a5a3acda..ae04d2d460df 100644 --- a/runtime/parachains/src/ump.rs +++ b/runtime/parachains/src/ump.rs @@ -21,11 +21,7 @@ use crate::{ use frame_support::pallet_prelude::*; use primitives::v1::{Id as ParaId, UpwardMessage}; use sp_std::{ - collections::{btree_map::BTreeMap, vec_deque::VecDeque}, - convert::TryFrom, - fmt, - marker::PhantomData, - prelude::*, + collections::btree_map::BTreeMap, convert::TryFrom, fmt, marker::PhantomData, prelude::*, }; use xcm::latest::Outcome; @@ -211,7 +207,7 @@ pub mod pallet { /// The messages are processed in FIFO order. #[pallet::storage] pub type RelayDispatchQueues = - StorageMap<_, Twox64Concat, ParaId, VecDeque, ValueQuery>; + StorageMap<_, Twox64Concat, ParaId, Vec, ValueQuery>; /// Size of the dispatch queues. Caches sizes of the queues in `RelayDispatchQueue`. /// @@ -407,23 +403,27 @@ impl Pallet { config.ump_service_total_weight - weight_used }; - // dequeue the next message from the queue of the dispatchee - let (upward_message, became_empty) = queue_cache.dequeue::(dispatchee); - if let Some(upward_message) = upward_message { - match T::UmpSink::process_upward_message( - dispatchee, - &upward_message[..], - max_weight, - ) { - Ok(used) => weight_used += used, + // attempt to process the next message from the queue of the dispatchee; if not beyond + // our remaining weight limit, then consume it. + let maybe_next = queue_cache.peek_front::(dispatchee); + let became_empty = if let Some(upward_message) = maybe_next { + match T::UmpSink::process_upward_message(dispatchee, upward_message, max_weight) { + Ok(used) => { + weight_used += used; + queue_cache.consume_front::(dispatchee) + }, Err((id, required)) => { - // we process messages in order and don't drop them if we run out of weight, so need to break - // here. + // we process messages in order and don't drop them if we run out of weight, + // so need to break here without calling `consume_front`. Self::deposit_event(Event::WeightExhausted(id, max_weight, required)); break }, } - } + } else { + // this should never happen, since the cursor should never point to an empty queue. + // it is resolved harmlessly here anyway. + true + }; if became_empty { // the queue is empty now - this para doesn't need attention anymore. @@ -442,8 +442,8 @@ impl Pallet { /// To avoid constant fetching, deserializing and serialization the queues are cached. /// -/// After an item dequeued from a queue for the first time, the queue is stored in this struct rather -/// than being serialized and persisted. +/// After an item dequeued from a queue for the first time, the queue is stored in this struct +/// rather than being serialized and persisted. /// /// This implementation works best when: /// @@ -461,9 +461,10 @@ impl Pallet { struct QueueCache(BTreeMap); struct QueueCacheEntry { - queue: VecDeque, - count: u32, + queue: Vec, total_size: u32, + consumed_count: usize, + consumed_size: usize, } impl QueueCache { @@ -471,26 +472,35 @@ impl QueueCache { Self(BTreeMap::new()) } - /// Dequeues one item from the upward message queue of the given para. + fn ensure_cached(&mut self, para: ParaId) -> &mut QueueCacheEntry { + self.0.entry(para).or_insert_with(|| { + let queue = RelayDispatchQueues::::get(¶); + let (_, total_size) = RelayDispatchQueueSize::::get(¶); + QueueCacheEntry { queue, total_size, consumed_count: 0, consumed_size: 0 } + }) + } + + /// Returns the message at the front of `para`'s queue, or `None` if the queue is empty. /// - /// Returns `(upward_message, became_empty)`, where + /// Does not mutate the queue. + fn peek_front(&mut self, para: ParaId) -> Option<&UpwardMessage> { + let entry = self.ensure_cached::(para); + entry.queue.get(entry.consumed_count) + } + + /// Attempts to remove one message from the front of `para`'s queue. If the queue is empty, then + /// does nothing. /// - /// - `upward_message` a dequeued message or `None` if the queue _was_ empty. - /// - `became_empty` is true if the queue _became_ empty. - fn dequeue(&mut self, para: ParaId) -> (Option, bool) { - let cache_entry = self.0.entry(para).or_insert_with(|| { - let queue = as Store>::RelayDispatchQueues::get(¶); - let (count, total_size) = as Store>::RelayDispatchQueueSize::get(¶); - QueueCacheEntry { queue, count, total_size } - }); - let upward_message = cache_entry.queue.pop_front(); - if let Some(ref msg) = upward_message { - cache_entry.count -= 1; - cache_entry.total_size -= msg.len() as u32; + /// Returns `true` iff there are no more messages in the queue after the removal attempt. + fn consume_front(&mut self, para: ParaId) -> bool { + let cache_entry = self.ensure_cached::(para); + let upward_message = cache_entry.queue.get(cache_entry.consumed_count); + if let Some(msg) = upward_message { + cache_entry.consumed_count += 1; + cache_entry.consumed_size += msg.len(); } - let became_empty = cache_entry.queue.is_empty(); - (upward_message, became_empty) + cache_entry.consumed_count >= cache_entry.queue.len() } /// Flushes the updated queues into the storage. @@ -498,14 +508,16 @@ impl QueueCache { // NOTE we use an explicit method here instead of Drop impl because it has unwanted semantics // within runtime. It is dangerous to use because of double-panics and flushing on a panic // is not necessary as well. - for (para, QueueCacheEntry { queue, count, total_size }) in self.0 { - if queue.is_empty() { + for (para, entry) in self.0 { + if entry.consumed_count >= entry.queue.len() { // remove the entries altogether. - as Store>::RelayDispatchQueues::remove(¶); - as Store>::RelayDispatchQueueSize::remove(¶); - } else { - as Store>::RelayDispatchQueues::insert(¶, queue); - as Store>::RelayDispatchQueueSize::insert(¶, (count, total_size)); + RelayDispatchQueues::::remove(¶); + RelayDispatchQueueSize::::remove(¶); + } else if entry.consumed_count > 0 { + RelayDispatchQueues::::insert(¶, &entry.queue[entry.consumed_count..]); + let count = (entry.queue.len() - entry.consumed_count) as u32; + let size = entry.total_size.saturating_sub(entry.consumed_size as u32); + RelayDispatchQueueSize::::insert(¶, (count, size)); } } } @@ -586,137 +598,10 @@ impl NeedsDispatchCursor { } #[cfg(test)] -pub(crate) mod mock_sink { - //! An implementation of a mock UMP sink that allows attaching a probe for mocking the weights - //! and checking the sent messages. - //! - //! A default behavior of the UMP sink is to ignore an incoming message and return 0 weight. - //! - //! A probe can be attached to the mock UMP sink. When attached, the mock sink would consult the - //! probe to check whether the received message was expected and what weight it should return. - //! - //! There are two rules on how to use a probe: - //! - //! 1. There can be only one active probe at a time. Creation of another probe while there is - //! already an active one leads to a panic. The probe is scoped to a thread where it was created. - //! - //! 2. All messages expected by the probe must be received by the time of dropping it. Unreceived - //! messages will lead to a panic while dropping a probe. - - use super::{MessageId, ParaId, UmpSink, UpwardMessage}; +pub(crate) mod tests { + use super::*; + use crate::mock::{new_test_ext, take_processed, Configuration, MockGenesisConfig, Ump}; use frame_support::weights::Weight; - use std::{cell::RefCell, collections::vec_deque::VecDeque}; - - #[derive(Debug)] - struct UmpExpectation { - expected_origin: ParaId, - expected_msg: UpwardMessage, - mock_weight: Weight, - } - - std::thread_local! { - // `Some` here indicates that there is an active probe. - static HOOK: RefCell>> = RefCell::new(None); - } - - pub struct MockUmpSink; - impl UmpSink for MockUmpSink { - fn process_upward_message( - actual_origin: ParaId, - actual_msg: &[u8], - _max_weight: Weight, - ) -> Result { - Ok(HOOK - .with(|opt_hook| { - opt_hook.borrow_mut().as_mut().map(|hook| { - let UmpExpectation { expected_origin, expected_msg, mock_weight } = - match hook.pop_front() { - Some(expectation) => expectation, - None => { - panic!( - "The probe is active but didn't expect the message:\n\n\t{:?}.", - actual_msg, - ); - }, - }; - assert_eq!(expected_origin, actual_origin); - assert_eq!(expected_msg, &actual_msg[..]); - mock_weight - }) - }) - .unwrap_or(0)) - } - } - - pub struct Probe { - _private: (), - } - - impl Probe { - pub fn new() -> Self { - HOOK.with(|opt_hook| { - let prev = opt_hook.borrow_mut().replace(VecDeque::default()); - - // that can trigger if there were two probes were created during one session which - // is may be a bit strict, but may save time figuring out what's wrong. - // if you land here and you do need the two probes in one session consider - // dropping the the existing probe explicitly. - assert!(prev.is_none()); - }); - Self { _private: () } - } - - /// Add an expected message. - /// - /// The enqueued messages are processed in FIFO order. - pub fn assert_msg( - &mut self, - expected_origin: ParaId, - expected_msg: UpwardMessage, - mock_weight: Weight, - ) { - HOOK.with(|opt_hook| { - opt_hook.borrow_mut().as_mut().unwrap().push_back(UmpExpectation { - expected_origin, - expected_msg, - mock_weight, - }) - }); - } - } - - impl Drop for Probe { - fn drop(&mut self) { - let _ = HOOK.try_with(|opt_hook| { - let prev = opt_hook.borrow_mut().take().expect( - "this probe was created and hasn't been yet destroyed; - the probe cannot be replaced; - there is only one probe at a time allowed; - thus it cannot be `None`; - qed", - ); - - if !prev.is_empty() { - // some messages are left unchecked. We should notify the developer about this. - // however, we do so only if the thread doesn't panic already. Otherwise, the - // developer would get a SIGILL or SIGABRT without a meaningful error message. - if !std::thread::panicking() { - panic!( - "the probe is dropped and not all expected messages arrived: {:?}", - prev - ); - } - } - }); - // an `Err` here signals here that the thread local was already destroyed. - } - } -} - -#[cfg(test)] -mod tests { - use super::{mock_sink::Probe, *}; - use crate::mock::{new_test_ext, Configuration, MockGenesisConfig, Ump}; use std::collections::HashSet; struct GenesisConfigBuilder { @@ -826,15 +711,12 @@ mod tests { #[test] fn dispatch_single_message() { let a = ParaId::from(228); - let msg = vec![1, 2, 3]; + let msg = 1000u32.encode(); new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| { - let mut probe = Probe::new(); - - probe.assert_msg(a, msg.clone(), 0); - queue_upward_msg(a, msg); - + queue_upward_msg(a, msg.clone()); Ump::process_pending_upward_messages(); + assert_eq!(take_processed(), vec![(a, msg)]); assert_storage_consistency_exhaustive(); }); @@ -846,11 +728,11 @@ mod tests { let c = ParaId::from(228); let q = ParaId::from(911); - let a_msg_1 = vec![1, 2, 3]; - let a_msg_2 = vec![3, 2, 1]; - let c_msg_1 = vec![4, 5, 6]; - let c_msg_2 = vec![9, 8, 7]; - let q_msg = b"we are Q".to_vec(); + let a_msg_1 = (200u32, "a_msg_1").encode(); + let a_msg_2 = (100u32, "a_msg_2").encode(); + let c_msg_1 = (300u32, "c_msg_1").encode(); + let c_msg_2 = (100u32, "c_msg_2").encode(); + let q_msg = (500u32, "q_msg").encode(); new_test_ext( GenesisConfigBuilder { ump_service_total_weight: 500, ..Default::default() }.build(), @@ -864,52 +746,60 @@ mod tests { assert_storage_consistency_exhaustive(); // we expect only two first messages to fit in the first iteration. - { - let mut probe = Probe::new(); - - probe.assert_msg(a, a_msg_1.clone(), 300); - probe.assert_msg(c, c_msg_1.clone(), 300); - Ump::process_pending_upward_messages(); - assert_storage_consistency_exhaustive(); - - drop(probe); - } + Ump::process_pending_upward_messages(); + assert_eq!(take_processed(), vec![(a, a_msg_1), (c, c_msg_1)]); + assert_storage_consistency_exhaustive(); queue_upward_msg(c, c_msg_2.clone()); assert_storage_consistency_exhaustive(); // second iteration should process the second message. - { - let mut probe = Probe::new(); + Ump::process_pending_upward_messages(); + assert_eq!(take_processed(), vec![(q, q_msg)]); + assert_storage_consistency_exhaustive(); - probe.assert_msg(q, q_msg.clone(), 500); - Ump::process_pending_upward_messages(); - assert_storage_consistency_exhaustive(); + // 3rd iteration. + Ump::process_pending_upward_messages(); + assert_eq!(take_processed(), vec![(a, a_msg_2), (c, c_msg_2)]); + assert_storage_consistency_exhaustive(); - drop(probe); - } + // finally, make sure that the queue is empty. + Ump::process_pending_upward_messages(); + assert_eq!(take_processed(), vec![]); + assert_storage_consistency_exhaustive(); + }); + } - // 3rd iteration. - { - let mut probe = Probe::new(); + #[test] + fn dispatch_keeps_message_after_weight_exhausted() { + let a = ParaId::from(128); - probe.assert_msg(a, a_msg_2.clone(), 100); - probe.assert_msg(c, c_msg_2.clone(), 100); - Ump::process_pending_upward_messages(); - assert_storage_consistency_exhaustive(); + let a_msg_1 = (300u32, "a_msg_1").encode(); + let a_msg_2 = (300u32, "a_msg_2").encode(); - drop(probe); - } + new_test_ext( + GenesisConfigBuilder { ump_service_total_weight: 500, ..Default::default() }.build(), + ) + .execute_with(|| { + queue_upward_msg(a, a_msg_1.clone()); + queue_upward_msg(a, a_msg_2.clone()); - // finally, make sure that the queue is empty. - { - let probe = Probe::new(); + assert_storage_consistency_exhaustive(); - Ump::process_pending_upward_messages(); - assert_storage_consistency_exhaustive(); + // we expect only one message to fit in the first iteration. + Ump::process_pending_upward_messages(); + assert_eq!(take_processed(), vec![(a, a_msg_1)]); + assert_storage_consistency_exhaustive(); - drop(probe); - } + // second iteration should process the remaining message. + Ump::process_pending_upward_messages(); + assert_eq!(take_processed(), vec![(a, a_msg_2)]); + assert_storage_consistency_exhaustive(); + + // finally, make sure that the queue is empty. + Ump::process_pending_upward_messages(); + assert_eq!(take_processed(), vec![]); + assert_storage_consistency_exhaustive(); }); } @@ -918,9 +808,9 @@ mod tests { let a = ParaId::from(1991); let b = ParaId::from(1999); - let a_msg_1 = vec![1, 2, 3]; - let a_msg_2 = vec![3, 2, 1]; - let b_msg_1 = vec![4, 5, 6]; + let a_msg_1 = (300u32, "a_msg_1").encode(); + let a_msg_2 = (300u32, "a_msg_2").encode(); + let b_msg_1 = (300u32, "b_msg_1").encode(); new_test_ext( GenesisConfigBuilder { ump_service_total_weight: 900, ..Default::default() }.build(), @@ -935,18 +825,8 @@ mod tests { queue_upward_msg(a, a_msg_1.clone()); queue_upward_msg(a, a_msg_2.clone()); queue_upward_msg(b, b_msg_1.clone()); - - { - let mut probe = Probe::new(); - - probe.assert_msg(a, a_msg_1.clone(), 300); - probe.assert_msg(b, b_msg_1.clone(), 300); - probe.assert_msg(a, a_msg_2.clone(), 300); - - Ump::process_pending_upward_messages(); - - drop(probe); - } + Ump::process_pending_upward_messages(); + assert_eq!(take_processed(), vec![(a, a_msg_1), (b, b_msg_1), (a, a_msg_2)]); }); }