From 47bd959c1ee12e2083feb38bf118fa906ff6fea5 Mon Sep 17 00:00:00 2001 From: Gavin Wood Date: Fri, 9 Dec 2022 10:38:24 +0000 Subject: [PATCH] General Message Queue Pallet (#12485) * The message queue * Make fully generic * Refactor * Docs * Refactor * Use iter not slice * Per-origin queues * Multi-queue processing * Introduce MaxReady * Remove MaxReady in favour of ready ring * Cleanups * ReadyRing and tests * Stale page reaping * from_components -> from_parts Signed-off-by: Oliver Tale-Yazdi * Move WeightCounter to sp_weights Signed-off-by: Oliver Tale-Yazdi * Add MockedWeightInfo Signed-off-by: Oliver Tale-Yazdi * Deploy to kitchensink Signed-off-by: Oliver Tale-Yazdi * Use WeightCounter Signed-off-by: Oliver Tale-Yazdi * Small fixes and logging Signed-off-by: Oliver Tale-Yazdi * Add service_page Signed-off-by: Oliver Tale-Yazdi * Typo Signed-off-by: Oliver Tale-Yazdi * Move service_page below service_queue Signed-off-by: Oliver Tale-Yazdi * Add service_message Signed-off-by: Oliver Tale-Yazdi * Use correct weight function Signed-off-by: Oliver Tale-Yazdi * Overweight execution * Refactor * Missing file * Fix WeightCounter usage in scheduler Signed-off-by: Oliver Tale-Yazdi * Fix peek_index Take into account that decoding from a mutable slice modifies it. Signed-off-by: Oliver Tale-Yazdi * Add tests and bench service_page_item Signed-off-by: Oliver Tale-Yazdi * Add debug_info Signed-off-by: Oliver Tale-Yazdi * Add no-progress check to service_queues Signed-off-by: Oliver Tale-Yazdi * Add more benches Signed-off-by: Oliver Tale-Yazdi * Bound from_message and try_append_message Signed-off-by: Oliver Tale-Yazdi * Add PageReaped event Signed-off-by: Oliver Tale-Yazdi * Rename BookStateOf and BookStateFor Signed-off-by: Oliver Tale-Yazdi * Update tests and remove logging Signed-off-by: Oliver Tale-Yazdi * Remove redundant per-message origins; add footprint() and sweep_queue() * Move testing stuff to mock.rs Signed-off-by: Oliver Tale-Yazdi * Add integration test Signed-off-by: Oliver Tale-Yazdi * Fix no-progress check Signed-off-by: Oliver Tale-Yazdi * Fix debug_info Signed-off-by: Oliver Tale-Yazdi * Fixup merge and tests Signed-off-by: Oliver Tale-Yazdi * Fix footprint tracking * Introduce * Formatting * OverweightEnqueued event, auto-servicing config item * Update tests and benchmarks Signed-off-by: Oliver Tale-Yazdi * Clippy Signed-off-by: Oliver Tale-Yazdi * Add tests Signed-off-by: Oliver Tale-Yazdi * Provide change handler * Add missing BookStateFor::insert and call QueueChangeHandler Signed-off-by: Oliver Tale-Yazdi * Docs Signed-off-by: Oliver Tale-Yazdi * Update benchmarks and weights Signed-off-by: Oliver Tale-Yazdi * More tests... Signed-off-by: Oliver Tale-Yazdi * Use weight metering functions Signed-off-by: Oliver Tale-Yazdi * weightInfo::process_message_payload is gone Signed-off-by: Oliver Tale-Yazdi * Add defensive_saturating_accrue Signed-off-by: Oliver Tale-Yazdi * Rename WeightCounter to WeightMeter Ctr+Shift+H should do the trick. Signed-off-by: Oliver Tale-Yazdi * Test on_initialize Signed-off-by: Oliver Tale-Yazdi * Add module docs Signed-off-by: Oliver Tale-Yazdi * Remove origin from MaxMessageLen The message origin is not encoded into the heap and does therefore not influence the max message length anymore. Signed-off-by: Oliver Tale-Yazdi * Add BoundedVec::as_slice Signed-off-by: Oliver Tale-Yazdi * Test Page::{from_message, try_append_message} Signed-off-by: Oliver Tale-Yazdi * Fixup docs Signed-off-by: Oliver Tale-Yazdi * Docs * Do nothing in sweep_queue if the queue does not exist ... otherwise it inserts default values into the storage. Signed-off-by: Oliver Tale-Yazdi * Test ring (un)knitting Signed-off-by: Oliver Tale-Yazdi * Upgrade stress-test Change the test to not assume that all queued messages will be processed in the next block but split it over multiple. Signed-off-by: Oliver Tale-Yazdi * More tests... Signed-off-by: Oliver Tale-Yazdi * Beauty fixes Signed-off-by: Oliver Tale-Yazdi * clippy Signed-off-by: Oliver Tale-Yazdi * Rename BoundedVec::as_slice to as_bounded_slice Conflicts with deref().as_slice() otherwise. Signed-off-by: Oliver Tale-Yazdi * Fix imports Signed-off-by: Oliver Tale-Yazdi * Remove ReadyRing struct Was used for testing only. Instead use 'fn assert_ring' which also check the service head and backlinks. Signed-off-by: Oliver Tale-Yazdi * Beauty fixes Signed-off-by: Oliver Tale-Yazdi * Fix stale page watermark Signed-off-by: Oliver Tale-Yazdi * Cleanup Signed-off-by: Oliver Tale-Yazdi * Fix test feature and clippy Signed-off-by: Oliver Tale-Yazdi * QueueChanged handler is called correctly Signed-off-by: Oliver Tale-Yazdi * Update benches Signed-off-by: Oliver Tale-Yazdi * Abstract testing functions Signed-off-by: Oliver Tale-Yazdi * More tests Signed-off-by: Oliver Tale-Yazdi * Cleanup Signed-off-by: Oliver Tale-Yazdi * Clippy Signed-off-by: Oliver Tale-Yazdi * fmt Signed-off-by: Oliver Tale-Yazdi * Simplify tests Signed-off-by: Oliver Tale-Yazdi * Make stuff compile Signed-off-by: Oliver Tale-Yazdi * Extend overweight execution benchmark Signed-off-by: Oliver Tale-Yazdi * Remove TODOs Signed-off-by: Oliver Tale-Yazdi * Test service queue with faulty MessageProcessor Signed-off-by: Oliver Tale-Yazdi * fmt Signed-off-by: Oliver Tale-Yazdi * Update pallet ui tests to 1.65 Signed-off-by: Oliver Tale-Yazdi * More docs Signed-off-by: Oliver Tale-Yazdi * Review doc fixes Co-authored-by: Robert Klotzner Signed-off-by: Oliver Tale-Yazdi * Add weight_limit to extrinsic weight of execute_overweight * Correctly return unused weight * Return actual weight consumed in do_execute_overweight * Review fixes Signed-off-by: Oliver Tale-Yazdi * Set version 7.0.0-dev Signed-off-by: Oliver Tale-Yazdi * Make it compile Signed-off-by: Oliver Tale-Yazdi * Switch message_size to u64 Signed-off-by: Oliver Tale-Yazdi * Switch message_count to u64 Signed-off-by: Oliver Tale-Yazdi * Fix benchmarks Signed-off-by: Oliver Tale-Yazdi * Make CI green Signed-off-by: Oliver Tale-Yazdi * Docs * Update tests Signed-off-by: Oliver Tale-Yazdi * ".git/.scripts/bench-bot.sh" pallet dev pallet_message_queue * Dont mention README.md in the Cargo.toml Signed-off-by: Oliver Tale-Yazdi * Remove reference to readme Signed-off-by: Oliver Tale-Yazdi Co-authored-by: Oliver Tale-Yazdi Co-authored-by: parity-processbot <> Co-authored-by: Robert Klotzner Co-authored-by: Keith Yeung --- Cargo.lock | 40 +- Cargo.toml | 1 + bin/node/runtime/Cargo.toml | 4 + bin/node/runtime/src/lib.rs | 21 + frame/message-queue/Cargo.toml | 53 + frame/message-queue/src/benchmarking.rs | 204 +++ frame/message-queue/src/integration_test.rs | 224 +++ frame/message-queue/src/lib.rs | 1308 +++++++++++++++++ frame/message-queue/src/mock.rs | 312 ++++ frame/message-queue/src/mock_helpers.rs | 185 +++ frame/message-queue/src/tests.rs | 1092 ++++++++++++++ frame/message-queue/src/weights.rs | 216 +++ frame/scheduler/Cargo.toml | 2 + frame/scheduler/src/lib.rs | 3 +- frame/support/src/traits.rs | 6 + frame/support/src/traits/messages.rs | 202 +++ ...age_ensure_span_are_ok_on_wrong_gen.stderr | 6 +- ...re_span_are_ok_on_wrong_gen_unnamed.stderr | 6 +- primitives/core/src/bounded/bounded_vec.rs | 7 + primitives/weights/src/weight_meter.rs | 6 + 20 files changed, 3883 insertions(+), 15 deletions(-) create mode 100644 frame/message-queue/Cargo.toml create mode 100644 frame/message-queue/src/benchmarking.rs create mode 100644 frame/message-queue/src/integration_test.rs create mode 100644 frame/message-queue/src/lib.rs create mode 100644 frame/message-queue/src/mock.rs create mode 100644 frame/message-queue/src/mock_helpers.rs create mode 100644 frame/message-queue/src/tests.rs create mode 100644 frame/message-queue/src/weights.rs create mode 100644 frame/support/src/traits/messages.rs diff --git a/Cargo.lock b/Cargo.lock index 73effefc48da1..41c641cf05963 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3117,6 +3117,7 @@ dependencies = [ "pallet-indices", "pallet-lottery", "pallet-membership", + "pallet-message-queue", "pallet-mmr", "pallet-multisig", "pallet-nis", @@ -5322,6 +5323,28 @@ dependencies = [ "sp-std", ] +[[package]] +name = "pallet-message-queue" +version = "7.0.0-dev" +dependencies = [ + "frame-benchmarking", + "frame-support", + "frame-system", + "log", + "parity-scale-codec", + "rand 0.8.5", + "rand_distr", + "scale-info", + "serde", + "sp-arithmetic", + "sp-core", + "sp-io", + "sp-runtime", + "sp-std", + "sp-tracing", + "sp-weights", +] + [[package]] name = "pallet-mmr" version = "4.0.0-dev" @@ -5709,6 +5732,7 @@ dependencies = [ "sp-io", "sp-runtime", "sp-std", + "sp-weights", "substrate-test-utils", ] @@ -8397,9 +8421,9 @@ dependencies = [ [[package]] name = "scale-info" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8980cafbe98a7ee7a9cc16b32ebce542c77883f512d83fbf2ddc8f6a85ea74c9" +checksum = "333af15b02563b8182cd863f925bd31ef8fa86a0e095d30c091956057d436153" dependencies = [ "bitvec", "cfg-if", @@ -8411,9 +8435,9 @@ dependencies = [ [[package]] name = "scale-info-derive" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4260c630e8a8a33429d1688eff2f163f24c65a4e1b1578ef6b565061336e4b6f" +checksum = "53f56acbd0743d29ffa08f911ab5397def774ad01bab3786804cf6ee057fb5e1" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -8570,9 +8594,9 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "serde" -version = "1.0.136" +version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce31e24b01e1e524df96f1c2fdd054405f8d7376249a5110886fb4b658484789" +checksum = "728eb6351430bccb993660dfffc5a72f91ccc1295abaa8ce19b27ebe4f75568b" dependencies = [ "serde_derive", ] @@ -8589,9 +8613,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.136" +version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08597e7152fcd306f41838ed3e37be9eaeed2b61c42e2117266a554fab4662f9" +checksum = "81fa1584d3d1bcacd84c277a0dfe21f5b0f6accf4a23d04d4c6d61f1af522b4c" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 12f2ced0d1d03..eb78d5e104486 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -121,6 +121,7 @@ members = [ "frame/offences/benchmarking", "frame/preimage", "frame/proxy", + "frame/message-queue", "frame/nomination-pools", "frame/nomination-pools/fuzzer", "frame/nomination-pools/benchmarking", diff --git a/bin/node/runtime/Cargo.toml b/bin/node/runtime/Cargo.toml index 02a2ae292d83e..477545c9ac332 100644 --- a/bin/node/runtime/Cargo.toml +++ b/bin/node/runtime/Cargo.toml @@ -75,6 +75,7 @@ pallet-indices = { version = "4.0.0-dev", default-features = false, path = "../. pallet-identity = { version = "4.0.0-dev", default-features = false, path = "../../../frame/identity" } pallet-lottery = { version = "4.0.0-dev", default-features = false, path = "../../../frame/lottery" } pallet-membership = { version = "4.0.0-dev", default-features = false, path = "../../../frame/membership" } +pallet-message-queue = { version = "7.0.0-dev", default-features = false, path = "../../../frame/message-queue" } pallet-mmr = { version = "4.0.0-dev", default-features = false, path = "../../../frame/merkle-mountain-range" } pallet-multisig = { version = "4.0.0-dev", default-features = false, path = "../../../frame/multisig" } pallet-nomination-pools = { version = "1.0.0", default-features = false, path = "../../../frame/nomination-pools"} @@ -150,6 +151,7 @@ std = [ "sp-inherents/std", "pallet-lottery/std", "pallet-membership/std", + "pallet-message-queue/std", "pallet-mmr/std", "pallet-multisig/std", "pallet-nomination-pools/std", @@ -229,6 +231,7 @@ runtime-benchmarks = [ "pallet-indices/runtime-benchmarks", "pallet-lottery/runtime-benchmarks", "pallet-membership/runtime-benchmarks", + "pallet-message-queue/runtime-benchmarks", "pallet-mmr/runtime-benchmarks", "pallet-multisig/runtime-benchmarks", "pallet-nomination-pools-benchmarking/runtime-benchmarks", @@ -282,6 +285,7 @@ try-runtime = [ "pallet-identity/try-runtime", "pallet-lottery/try-runtime", "pallet-membership/try-runtime", + "pallet-message-queue/try-runtime", "pallet-mmr/try-runtime", "pallet-multisig/try-runtime", "pallet-nomination-pools/try-runtime", diff --git a/bin/node/runtime/src/lib.rs b/bin/node/runtime/src/lib.rs index 1bb4dd6f913a6..7cd42be73a19b 100644 --- a/bin/node/runtime/src/lib.rs +++ b/bin/node/runtime/src/lib.rs @@ -1135,6 +1135,25 @@ impl pallet_bounties::Config for Runtime { type ChildBountyManager = ChildBounties; } +parameter_types! { + /// Allocate at most 20% of each block for message processing. + /// + /// Is set to 20% since the scheduler can already consume a maximum of 80%. + pub MessageQueueServiceWeight: Option = Some(Perbill::from_percent(20) * RuntimeBlockWeights::get().max_block); +} + +impl pallet_message_queue::Config for Runtime { + type RuntimeEvent = RuntimeEvent; + type WeightInfo = (); + /// NOTE: Always set this to `NoopMessageProcessor` for benchmarking. + type MessageProcessor = pallet_message_queue::mock_helpers::NoopMessageProcessor; + type Size = u32; + type QueueChangeHandler = (); + type HeapSize = ConstU32<{ 64 * 1024 }>; + type MaxStale = ConstU32<128>; + type ServiceWeight = MessageQueueServiceWeight; +} + parameter_types! { pub const ChildBountyValueMinimum: Balance = 1 * DOLLARS; } @@ -1699,6 +1718,7 @@ construct_runtime!( RankedPolls: pallet_referenda::, RankedCollective: pallet_ranked_collective, FastUnstake: pallet_fast_unstake, + MessageQueue: pallet_message_queue, } ); @@ -1793,6 +1813,7 @@ mod benches { [pallet_indices, Indices] [pallet_lottery, Lottery] [pallet_membership, TechnicalMembership] + [pallet_message_queue, MessageQueue] [pallet_mmr, Mmr] [pallet_multisig, Multisig] [pallet_nomination_pools, NominationPoolsBench::] diff --git a/frame/message-queue/Cargo.toml b/frame/message-queue/Cargo.toml new file mode 100644 index 0000000000000..47d114902f52c --- /dev/null +++ b/frame/message-queue/Cargo.toml @@ -0,0 +1,53 @@ +[package] +authors = ["Parity Technologies "] +edition = "2021" +name = "pallet-message-queue" +version = "7.0.0-dev" +license = "Apache-2.0" +homepage = "https://substrate.io" +repository = "https://github.com/paritytech/substrate/" +description = "FRAME pallet to queue and process messages" + +[dependencies] +codec = { package = "parity-scale-codec", version = "3.0.0", default-features = false, features = ["derive"] } +scale-info = { version = "2.1.2", default-features = false, features = ["derive"] } +serde = { version = "1.0.137", optional = true, features = ["derive"] } +log = { version = "0.4.17", default-features = false } + +sp-core = { version = "7.0.0", default-features = false, path = "../../primitives/core" } +sp-io = { version = "7.0.0", default-features = false, path = "../../primitives/io" } +sp-runtime = { version = "7.0.0", default-features = false, path = "../../primitives/runtime" } +sp-std = { version = "5.0.0", default-features = false, path = "../../primitives/std" } +sp-arithmetic = { version = "6.0.0", default-features = false, path = "../../primitives/arithmetic" } +sp-weights = { version = "4.0.0", default-features = false, path = "../../primitives/weights" } + +frame-benchmarking = { version = "4.0.0-dev", default-features = false, optional = true, path = "../benchmarking" } +frame-support = { version = "4.0.0-dev", default-features = false, path = "../support" } +frame-system = { version = "4.0.0-dev", default-features = false, path = "../system" } + +[dev-dependencies] +sp-tracing = { version = "6.0.0", path = "../../primitives/tracing" } +rand = "0.8.5" +rand_distr = "0.4.3" + +[features] +default = ["std"] +std = [ + "codec/std", + "scale-info/std", + "sp-core/std", + "sp-io/std", + "sp-runtime/std", + "sp-std/std", + "sp-arithmetic/std", + "sp-weights/std", + "frame-benchmarking?/std", + "frame-support/std", + "frame-system/std", +] +runtime-benchmarks = [ + "frame-benchmarking/runtime-benchmarks", + "frame-support/runtime-benchmarks", + "frame-system/runtime-benchmarks", +] +try-runtime = ["frame-support/try-runtime"] diff --git a/frame/message-queue/src/benchmarking.rs b/frame/message-queue/src/benchmarking.rs new file mode 100644 index 0000000000000..c0ff20431d00e --- /dev/null +++ b/frame/message-queue/src/benchmarking.rs @@ -0,0 +1,204 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Benchmarking for the message queue pallet. + +#![cfg(feature = "runtime-benchmarks")] +#![allow(unused_assignments)] // Needed for `ready_ring_knit`. + +use super::{mock_helpers::*, Pallet as MessageQueue, *}; + +use frame_benchmarking::{benchmarks, whitelisted_caller}; +use frame_support::traits::Get; +use frame_system::RawOrigin; +use sp_std::prelude::*; + +benchmarks! { + where_clause { + where + // NOTE: We need to generate multiple origins, therefore Origin is `From`. The + // `PartialEq` is for asserting the outcome of the ring (un)knitting and *could* be + // removed if really necessary. + <::MessageProcessor as ProcessMessage>::Origin: From + PartialEq, + ::Size: From, + } + + // Worst case path of `ready_ring_knit`. + ready_ring_knit { + let mid: MessageOriginOf:: = 1.into(); + build_ring::(&[0.into(), mid.clone(), 2.into()]); + unknit::(&mid); + assert_ring::(&[0.into(), 2.into()]); + let mut neighbours = None; + }: { + neighbours = MessageQueue::::ready_ring_knit(&mid).ok(); + } verify { + // The neighbours needs to be modified manually. + BookStateFor::::mutate(&mid, |b| { b.ready_neighbours = neighbours }); + assert_ring::(&[0.into(), 2.into(), mid]); + } + + // Worst case path of `ready_ring_unknit`. + ready_ring_unknit { + build_ring::(&[0.into(), 1.into(), 2.into()]); + assert_ring::(&[0.into(), 1.into(), 2.into()]); + let o: MessageOriginOf:: = 0.into(); + let neighbours = BookStateFor::::get(&o).ready_neighbours.unwrap(); + }: { + MessageQueue::::ready_ring_unknit(&o, neighbours); + } verify { + assert_ring::(&[1.into(), 2.into()]); + } + + // `service_queues` without any queue processing. + service_queue_base { + }: { + MessageQueue::::service_queue(0.into(), &mut WeightMeter::max_limit(), Weight::MAX) + } + + // `service_page` without any message processing but with page completion. + service_page_base_completion { + let origin: MessageOriginOf = 0.into(); + let page = PageOf::::default(); + Pages::::insert(&origin, 0, &page); + let mut book_state = single_page_book::(); + let mut meter = WeightMeter::max_limit(); + let limit = Weight::MAX; + }: { + MessageQueue::::service_page(&origin, &mut book_state, &mut meter, limit) + } + + // `service_page` without any message processing and without page completion. + service_page_base_no_completion { + let origin: MessageOriginOf = 0.into(); + let mut page = PageOf::::default(); + // Mock the storage such that `is_complete` returns `false` but `peek_first` returns `None`. + page.first = 1.into(); + page.remaining = 1.into(); + Pages::::insert(&origin, 0, &page); + let mut book_state = single_page_book::(); + let mut meter = WeightMeter::max_limit(); + let limit = Weight::MAX; + }: { + MessageQueue::::service_page(&origin, &mut book_state, &mut meter, limit) + } + + // Processing a single message from a page. + service_page_item { + let msg = vec![1u8; MaxMessageLenOf::::get() as usize]; + let mut page = page::(&msg.clone()); + let mut book = book_for::(&page); + assert!(page.peek_first().is_some(), "There is one message"); + let mut weight = WeightMeter::max_limit(); + }: { + let status = MessageQueue::::service_page_item(&0u32.into(), 0, &mut book, &mut page, &mut weight, Weight::MAX); + assert_eq!(status, ItemExecutionStatus::Executed(true)); + } verify { + // Check that it was processed. + assert_last_event::(Event::Processed { + hash: T::Hashing::hash(&msg), origin: 0.into(), + weight_used: 1.into_weight(), success: true + }.into()); + let (_, processed, _) = page.peek_index(0).unwrap(); + assert!(processed); + assert_eq!(book.message_count, 0); + } + + // Worst case for calling `bump_service_head`. + bump_service_head { + setup_bump_service_head::(0.into(), 10.into()); + let mut weight = WeightMeter::max_limit(); + }: { + MessageQueue::::bump_service_head(&mut weight); + } verify { + assert_eq!(ServiceHead::::get().unwrap(), 10u32.into()); + assert_eq!(weight.consumed, T::WeightInfo::bump_service_head()); + } + + reap_page { + // Mock the storage to get a *cullable* but not *reapable* page. + let origin: MessageOriginOf = 0.into(); + let mut book = single_page_book::(); + let (page, msgs) = full_page::(); + + for p in 0 .. T::MaxStale::get() * T::MaxStale::get() { + if p == 0 { + Pages::::insert(&origin, p, &page); + } + book.end += 1; + book.count += 1; + book.message_count += msgs as u64; + book.size += page.remaining_size.into() as u64; + } + book.begin = book.end - T::MaxStale::get(); + BookStateFor::::insert(&origin, &book); + assert!(Pages::::contains_key(&origin, 0)); + + }: _(RawOrigin::Signed(whitelisted_caller()), 0u32.into(), 0) + verify { + assert_last_event::(Event::PageReaped{ origin: 0.into(), index: 0 }.into()); + assert!(!Pages::::contains_key(&origin, 0)); + } + + // Worst case for `execute_overweight` where the page is removed as completed. + // + // The worst case occurs when executing the last message in a page of which all are skipped since it is using `peek_index` which has linear complexities. + execute_overweight_page_removed { + let origin: MessageOriginOf = 0.into(); + let (mut page, msgs) = full_page::(); + // Skip all messages. + for _ in 1..msgs { + page.skip_first(true); + } + page.skip_first(false); + let book = book_for::(&page); + Pages::::insert(&origin, 0, &page); + BookStateFor::::insert(&origin, &book); + }: { + MessageQueue::::execute_overweight(RawOrigin::Signed(whitelisted_caller()).into(), 0u32.into(), 0u32, ((msgs - 1) as u32).into(), Weight::MAX).unwrap() + } + verify { + assert_last_event::(Event::Processed { + hash: T::Hashing::hash(&((msgs - 1) as u32).encode()), origin: 0.into(), + weight_used: Weight::from_parts(1, 1), success: true + }.into()); + assert!(!Pages::::contains_key(&origin, 0), "Page must be removed"); + } + + // Worst case for `execute_overweight` where the page is updated. + execute_overweight_page_updated { + let origin: MessageOriginOf = 0.into(); + let (mut page, msgs) = full_page::(); + // Skip all messages. + for _ in 0..msgs { + page.skip_first(false); + } + let book = book_for::(&page); + Pages::::insert(&origin, 0, &page); + BookStateFor::::insert(&origin, &book); + }: { + MessageQueue::::execute_overweight(RawOrigin::Signed(whitelisted_caller()).into(), 0u32.into(), 0u32, ((msgs - 1) as u32).into(), Weight::MAX).unwrap() + } + verify { + assert_last_event::(Event::Processed { + hash: T::Hashing::hash(&((msgs - 1) as u32).encode()), origin: 0.into(), + weight_used: Weight::from_parts(1, 1), success: true + }.into()); + assert!(Pages::::contains_key(&origin, 0), "Page must be updated"); + } + + impl_benchmark_test_suite!(MessageQueue, crate::mock::new_test_ext::(), crate::integration_test::Test); +} diff --git a/frame/message-queue/src/integration_test.rs b/frame/message-queue/src/integration_test.rs new file mode 100644 index 0000000000000..a9b6ee9bd2214 --- /dev/null +++ b/frame/message-queue/src/integration_test.rs @@ -0,0 +1,224 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Stress tests pallet-message-queue. Defines its own runtime config to use larger constants for +//! `HeapSize` and `MaxStale`. + +#![cfg(test)] + +use crate::{ + mock::{ + new_test_ext, CountingMessageProcessor, IntoWeight, MockedWeightInfo, NumMessagesProcessed, + }, + *, +}; + +use crate as pallet_message_queue; +use frame_support::{ + parameter_types, + traits::{ConstU32, ConstU64}, +}; +use rand::{rngs::StdRng, Rng, SeedableRng}; +use rand_distr::Pareto; +use sp_core::H256; +use sp_runtime::{ + testing::Header, + traits::{BlakeTwo256, IdentityLookup}, +}; + +type UncheckedExtrinsic = frame_system::mocking::MockUncheckedExtrinsic; +type Block = frame_system::mocking::MockBlock; + +frame_support::construct_runtime!( + pub enum Test where + Block = Block, + NodeBlock = Block, + UncheckedExtrinsic = UncheckedExtrinsic, + { + System: frame_system::{Pallet, Call, Config, Storage, Event}, + MessageQueue: pallet_message_queue::{Pallet, Call, Storage, Event}, + } +); + +parameter_types! { + pub BlockWeights: frame_system::limits::BlockWeights = + frame_system::limits::BlockWeights::simple_max(frame_support::weights::Weight::from_ref_time(1024)); +} +impl frame_system::Config for Test { + type BaseCallFilter = frame_support::traits::Everything; + type BlockWeights = (); + type BlockLength = (); + type DbWeight = (); + type RuntimeOrigin = RuntimeOrigin; + type Index = u64; + type BlockNumber = u64; + type Hash = H256; + type RuntimeCall = RuntimeCall; + type Hashing = BlakeTwo256; + type AccountId = u64; + type Lookup = IdentityLookup; + type Header = Header; + type RuntimeEvent = RuntimeEvent; + type BlockHashCount = ConstU64<250>; + type Version = (); + type PalletInfo = PalletInfo; + type AccountData = (); + type OnNewAccount = (); + type OnKilledAccount = (); + type SystemWeightInfo = (); + type SS58Prefix = (); + type OnSetCode = (); + type MaxConsumers = ConstU32<16>; +} + +parameter_types! { + pub const HeapSize: u32 = 32 * 1024; + pub const MaxStale: u32 = 32; + pub static ServiceWeight: Option = Some(Weight::from_parts(100, 100)); +} + +impl Config for Test { + type RuntimeEvent = RuntimeEvent; + type WeightInfo = MockedWeightInfo; + type MessageProcessor = CountingMessageProcessor; + type Size = u32; + type QueueChangeHandler = (); + type HeapSize = HeapSize; + type MaxStale = MaxStale; + type ServiceWeight = ServiceWeight; +} + +/// Simulates heavy usage by enqueueing and processing large amounts of messages. +/// +/// Best to run with `-r`, `RUST_LOG=info` and `RUSTFLAGS='-Cdebug-assertions=y'`. +/// +/// # Example output +/// +/// ```pre +/// Enqueued 1189 messages across 176 queues. Payload 46.97 KiB +/// Processing 772 of 1189 messages +/// Enqueued 9270 messages across 1559 queues. Payload 131.85 KiB +/// Processing 6262 of 9687 messages +/// Enqueued 5025 messages across 1225 queues. Payload 100.23 KiB +/// Processing 1739 of 8450 messages +/// Enqueued 42061 messages across 6357 queues. Payload 536.29 KiB +/// Processing 11675 of 48772 messages +/// Enqueued 20253 messages across 2420 queues. Payload 288.34 KiB +/// Processing 28711 of 57350 messages +/// Processing all remaining 28639 messages +/// ``` +#[test] +#[ignore] // Only run in the CI. +fn stress_test_enqueue_and_service() { + let blocks = 20; + let max_queues = 10_000; + let max_messages_per_queue = 10_000; + let max_msg_len = MaxMessageLenOf::::get(); + let mut rng = StdRng::seed_from_u64(42); + + new_test_ext::().execute_with(|| { + let mut msgs_remaining = 0; + for _ in 0..blocks { + // Start by enqueuing a large number of messages. + let (enqueued, _) = + enqueue_messages(max_queues, max_messages_per_queue, max_msg_len, &mut rng); + msgs_remaining += enqueued; + + // Pick a fraction of all messages currently in queue and process them. + let processed = rng.gen_range(1..=msgs_remaining); + log::info!("Processing {} of all messages {}", processed, msgs_remaining); + process_messages(processed); // This also advances the block. + msgs_remaining -= processed; + } + log::info!("Processing all remaining {} messages", msgs_remaining); + process_messages(msgs_remaining); + post_conditions(); + }); +} + +/// Enqueue a random number of random messages into a random number of queues. +fn enqueue_messages( + max_queues: u32, + max_per_queue: u32, + max_msg_len: u32, + rng: &mut StdRng, +) -> (u32, usize) { + let num_queues = rng.gen_range(1..max_queues); + let mut num_messages = 0; + let mut total_msg_len = 0; + for origin in 0..num_queues { + let num_messages_per_queue = + (rng.sample(Pareto::new(1.0, 1.1).unwrap()) as u32).min(max_per_queue); + + for m in 0..num_messages_per_queue { + let mut message = format!("{}:{}", &origin, &m).into_bytes(); + let msg_len = (rng.sample(Pareto::new(1.0, 1.0).unwrap()) as u32) + .clamp(message.len() as u32, max_msg_len); + message.resize(msg_len as usize, 0); + MessageQueue::enqueue_message( + BoundedSlice::defensive_truncate_from(&message), + origin.into(), + ); + total_msg_len += msg_len; + } + num_messages += num_messages_per_queue; + } + log::info!( + "Enqueued {} messages across {} queues. Payload {:.2} KiB", + num_messages, + num_queues, + total_msg_len as f64 / 1024.0 + ); + (num_messages, total_msg_len as usize) +} + +/// Process the number of messages. +fn process_messages(num_msgs: u32) { + let weight = (num_msgs as u64).into_weight(); + ServiceWeight::set(Some(weight)); + let consumed = next_block(); + + assert_eq!(consumed, weight, "\n{}", MessageQueue::debug_info()); + assert_eq!(NumMessagesProcessed::take(), num_msgs as usize); +} + +/// Returns the weight consumed by `MessageQueue::on_initialize()`. +fn next_block() -> Weight { + MessageQueue::on_finalize(System::block_number()); + System::on_finalize(System::block_number()); + System::set_block_number(System::block_number() + 1); + System::on_initialize(System::block_number()); + MessageQueue::on_initialize(System::block_number()) +} + +/// Assert that the pallet is in the expected post state. +fn post_conditions() { + // All queues are empty. + for (_, book) in BookStateFor::::iter() { + assert!(book.end >= book.begin); + assert_eq!(book.count, 0); + assert_eq!(book.size, 0); + assert_eq!(book.message_count, 0); + assert!(book.ready_neighbours.is_none()); + } + // No pages remain. + assert_eq!(Pages::::iter().count(), 0); + // Service head is gone. + assert!(ServiceHead::::get().is_none()); + // This still works fine. + assert_eq!(MessageQueue::service_queues(Weight::MAX), Weight::zero(), "Nothing left"); + next_block(); +} diff --git a/frame/message-queue/src/lib.rs b/frame/message-queue/src/lib.rs new file mode 100644 index 0000000000000..9b976c48245c9 --- /dev/null +++ b/frame/message-queue/src/lib.rs @@ -0,0 +1,1308 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! # Generalized Message Queue Pallet +//! +//! Provides generalized message queuing and processing capabilities on a per-queue basis for +//! arbitrary use-cases. +//! +//! # Design Goals +//! +//! 1. Minimal assumptions about `Message`s and `MessageOrigin`s. Both should be MEL bounded blobs. +//! This ensures the generality and reusability of the pallet. +//! 2. Well known and tightly limited pre-dispatch PoV weights, especially for message execution. +//! This is paramount for the success of the pallet since message execution is done in +//! `on_initialize` which must _never_ under-estimate its PoV weight. It also needs a frugal PoV +//! footprint since PoV is scarce and this is (possibly) done in every block. This must also hold +//! in the presence of unpredictable message size distributions. +//! 3. Usable as XCMP, DMP and UMP message/dispatch queue - possibly through adapter types. +//! +//! # Design +//! +//! The pallet has means to enqueue, store and process messages. This is implemented by having +//! *queues* which store enqueued messages and can be *served* to process said messages. A queue is +//! identified by its origin in the `BookStateFor`. Each message has an origin which defines into +//! which queue it will be stored. Messages are stored by being appended to the last [`Page`] of a +//! book. Each book keeps track of its pages by indexing `Pages`. The `ReadyRing` contains all +//! queues which hold at least one unprocessed message and are thereby *ready* to be serviced. The +//! `ServiceHead` indicates which *ready* queue is the next to be serviced. +//! The pallet implements [`frame_support::traits::EnqueueMessage`], +//! [`frame_support::traits::ServiceQueues`] and has [`frame_support::traits::ProcessMessage`] and +//! [`OnQueueChanged`] hooks to communicate with the outside world. +//! +//! NOTE: The storage items are not linked since they are not public. +//! +//! **Message Execution** +//! +//! Executing a message is offloaded to the [`Config::MessageProcessor`] which contains the actual +//! logic of how to handle the message since they are blobs. A message can be temporarily or +//! permanently overweight. The pallet will perpetually try to execute a temporarily overweight +//! message. A permanently overweight message is skipped and must be executed manually. +//! +//! **Pagination** +//! +//! Queues are stored in a *paged* manner by splitting their messages into [`Page`]s. This results +//! in a lot of complexity when implementing the pallet but is completely necessary to archive the +//! second #[Design Goal](design-goals). The problem comes from the fact a message can *possibly* be +//! quite large, lets say 64KiB. This then results in a *MEL* of at least 64KiB which results in a +//! PoV of at least 64KiB. Now we have the assumption that most messages are much shorter than their +//! maximum allowed length. This would result in most messages having a pre-dispatch PoV size which +//! is much larger than their post-dispatch PoV size, possibly by a factor of thousand. Disregarding +//! this observation would cripple the processing power of the pallet since it cannot straighten out +//! this discrepancy at runtime. Conceptually, the implementation is packing as many messages into a +//! single bounded vec, as actually fit into the bounds. This reduces the wasted PoV. +//! +//! **Page Data Layout** +//! +//! A Page contains a heap which holds all its messages. The heap is built by concatenating +//! `(ItemHeader, Message)` pairs. The [`ItemHeader`] contains the length of the message which is +//! needed for retrieving it. This layout allows for constant access time of the next message and +//! linear access time for any message in the page. The header must remain minimal to reduce its PoV +//! impact. +//! +//! **Weight Metering** +//! +//! The pallet utilizes the [`sp_weights::WeightMeter`] to manually track its consumption to always +//! stay within the required limit. This implies that the message processor hook can calculate the +//! weight of a message without executing it. This restricts the possible use-cases but is necessary +//! since the pallet runs in `on_initialize` which has a hard weight limit. The weight meter is used +//! in a way that `can_accrue` and `check_accrue` are always used to check the remaining weight of +//! an operation before committing to it. The process of exiting due to insufficient weight is +//! termed "bailing". +//! +//! # Scenario: Message enqueuing +//! +//! A message `m` is enqueued for origin `o` into queue `Q[o]` through +//! [`frame_support::traits::EnqueueMessage::enqueue_message`]`(m, o)`. +//! +//! First the queue is either loaded if it exists or otherwise created with empty default values. +//! The message is then inserted to the queue by appended it into its last `Page` or by creating a +//! new `Page` just for `m` if it does not fit in there. The number of messages in the `Book` is +//! incremented. +//! +//! `Q[o]` is now *ready* which will eventually result in `m` being processed. +//! +//! # Scenario: Message processing +//! +//! The pallet runs each block in `on_initialize` or when being manually called through +//! [`frame_support::traits::ServiceQueues::service_queues`]. +//! +//! First it tries to "rotate" the `ReadyRing` by one through advancing the `ServiceHead` to the +//! next *ready* queue. It then starts to service this queue by servicing as many pages of it as +//! possible. Servicing a page means to execute as many message of it as possible. Each executed +//! message is marked as *processed* if the [`Config::MessageProcessor`] return Ok. An event +//! [`Event::Processed`] is emitted afterwards. It is possible that the weight limit of the pallet +//! will never allow a specific message to be executed. In this case it remains as unprocessed and +//! is skipped. This process stops if either there are no more messages in the queue or the +//! remaining weight became insufficient to service this queue. If there is enough weight it tries +//! to advance to the next *ready* queue and service it. This continues until there are no more +//! queues on which it can make progress or not enough weight to check that. +//! +//! # Scenario: Overweight execution +//! +//! A permanently over-weight message which was skipped by the message processing will never be +//! executed automatically through `on_initialize` nor by calling +//! [`frame_support::traits::ServiceQueues::service_queues`]. +//! +//! Manual intervention in the form of +//! [`frame_support::traits::ServiceQueues::execute_overweight`] is necessary. Overweight messages +//! emit an [`Event::OverweightEnqueued`] event which can be used to extract the arguments for +//! manual execution. This only works on permanently overweight messages. There is no guarantee that +//! this will work since the message could be part of a stale page and be reaped before execution +//! commences. +//! +//! # Terminology +//! +//! - `Message`: A blob of data into which the pallet has no introspection, defined as +//! [`BoundedSlice>`]. The message length is limited by [`MaxMessageLenOf`] +//! which is calculated from [`Config::HeapSize`] and [`ItemHeader::max_encoded_len()`]. +//! - `MessageOrigin`: A generic *origin* of a message, defined as [`MessageOriginOf`]. The +//! requirements for it are kept minimal to remain as generic as possible. The type is defined in +//! [`frame_support::traits::ProcessMessage::Origin`]. +//! - `Page`: An array of `Message`s, see [`Page`]. Can never be empty. +//! - `Book`: A list of `Page`s, see [`BookState`]. Can be empty. +//! - `Queue`: A `Book` together with an `MessageOrigin` which can be part of the `ReadyRing`. Can +//! be empty. +//! - `ReadyRing`: A double-linked list which contains all *ready* `Queue`s. It chains together the +//! queues via their `ready_neighbours` fields. A `Queue` is *ready* if it contains at least one +//! `Message` which can be processed. Can be empty. +//! - `ServiceHead`: A pointer into the `ReadyRing` to the next `Queue` to be serviced. +//! - (`un`)`processed`: A message is marked as *processed* after it was executed by the pallet. A +//! message which was either: not yet executed or could not be executed remains as `unprocessed` +//! which is the default state for a message after being enqueued. +//! - `knitting`/`unknitting`: The means of adding or removing a `Queue` from the `ReadyRing`. +//! - `MEL`: The Max Encoded Length of a type, see [`codec::MaxEncodedLen`]. +//! +//! # Properties +//! +//! **Liveness - Enqueueing** +//! +//! It is always possible to enqueue any message for any `MessageOrigin`. +//! +//! **Liveness - Processing** +//! +//! `on_initialize` always respects its finite weight-limit. +//! +//! **Progress - Enqueueing** +//! +//! An enqueued message immediately becomes *unprocessed* and thereby eligible for execution. +//! +//! **Progress - Processing** +//! +//! The pallet will execute at least one unprocessed message per block, if there is any. Ensuring +//! this property needs careful consideration of the concrete weights, since it is possible that the +//! weight limit of `on_initialize` never allows for the execution of even one message; trivially if +//! the limit is set to zero. `integrity_test` can be used to ensure that this property holds. +//! +//! **Fairness - Enqueuing** +//! +//! Enqueueing a message for a specific `MessageOrigin` does not influence the ability to enqueue a +//! message for the same of any other `MessageOrigin`; guaranteed by **Liveness - Enqueueing**. +//! +//! **Fairness - Processing** +//! +//! The average amount of weight available for message processing is the same for each queue if the +//! number of queues is constant. Creating a new queue must therefore be, possibly economically, +//! expensive. Currently this is archived by having one queue per para-chain/thread, which keeps the +//! number of queues within `O(n)` and should be "good enough". + +#![cfg_attr(not(feature = "std"), no_std)] + +mod benchmarking; +mod integration_test; +mod mock; +pub mod mock_helpers; +mod tests; +pub mod weights; + +use codec::{Codec, Decode, Encode, MaxEncodedLen}; +use frame_support::{ + defensive, + pallet_prelude::*, + traits::{ + DefensiveTruncateFrom, EnqueueMessage, ExecuteOverweightError, Footprint, ProcessMessage, + ProcessMessageError, ServiceQueues, + }, + BoundedSlice, CloneNoBound, DefaultNoBound, +}; +use frame_system::pallet_prelude::*; +pub use pallet::*; +use scale_info::TypeInfo; +use sp_arithmetic::traits::{BaseArithmetic, Unsigned}; +use sp_runtime::{ + traits::{Hash, One, Zero}, + SaturatedConversion, Saturating, +}; +use sp_std::{fmt::Debug, ops::Deref, prelude::*, vec}; +use sp_weights::WeightMeter; +pub use weights::WeightInfo; + +/// Type for identifying a page. +type PageIndex = u32; + +/// Data encoded and prefixed to the encoded `MessageItem`. +#[derive(Encode, Decode, PartialEq, MaxEncodedLen, Debug)] +pub struct ItemHeader { + /// The length of this item, not including the size of this header. The next item of the page + /// follows immediately after the payload of this item. + payload_len: Size, + /// Whether this item has been processed. + is_processed: bool, +} + +/// A page of messages. Pages always contain at least one item. +#[derive( + CloneNoBound, Encode, Decode, RuntimeDebugNoBound, DefaultNoBound, TypeInfo, MaxEncodedLen, +)] +#[scale_info(skip_type_params(HeapSize))] +#[codec(mel_bound(Size: MaxEncodedLen))] +pub struct Page + Debug + Clone + Default, HeapSize: Get> { + /// Messages remaining to be processed; this includes overweight messages which have been + /// skipped. + remaining: Size, + /// The size of all remaining messages to be processed. + /// + /// Includes overweight messages outside of the `first` to `last` window. + remaining_size: Size, + /// The number of items before the `first` item in this page. + first_index: Size, + /// The heap-offset of the header of the first message item in this page which is ready for + /// processing. + first: Size, + /// The heap-offset of the header of the last message item in this page. + last: Size, + /// The heap. If `self.offset == self.heap.len()` then the page is empty and should be deleted. + heap: BoundedVec>, +} + +impl< + Size: BaseArithmetic + Unsigned + Copy + Into + Codec + MaxEncodedLen + Debug + Default, + HeapSize: Get, + > Page +{ + /// Create a [`Page`] from one unprocessed message. + fn from_message(message: BoundedSlice>) -> Self { + let payload_len = message.len(); + let data_len = ItemHeader::::max_encoded_len().saturating_add(payload_len); + let payload_len = payload_len.saturated_into(); + let header = ItemHeader:: { payload_len, is_processed: false }; + + let mut heap = Vec::with_capacity(data_len); + header.using_encoded(|h| heap.extend_from_slice(h)); + heap.extend_from_slice(message.deref()); + + Page { + remaining: One::one(), + remaining_size: payload_len, + first_index: Zero::zero(), + first: Zero::zero(), + last: Zero::zero(), + heap: BoundedVec::defensive_truncate_from(heap), + } + } + + /// Try to append one message to a page. + fn try_append_message( + &mut self, + message: BoundedSlice>, + ) -> Result<(), ()> { + let pos = self.heap.len(); + let payload_len = message.len(); + let data_len = ItemHeader::::max_encoded_len().saturating_add(payload_len); + let payload_len = payload_len.saturated_into(); + let header = ItemHeader:: { payload_len, is_processed: false }; + let heap_size: u32 = HeapSize::get().into(); + if (heap_size as usize).saturating_sub(self.heap.len()) < data_len { + // Can't fit. + return Err(()) + } + + let mut heap = sp_std::mem::take(&mut self.heap).into_inner(); + header.using_encoded(|h| heap.extend_from_slice(h)); + heap.extend_from_slice(message.deref()); + self.heap = BoundedVec::defensive_truncate_from(heap); + self.last = pos.saturated_into(); + self.remaining.saturating_inc(); + self.remaining_size.saturating_accrue(payload_len); + Ok(()) + } + + /// Returns the first message in the page without removing it. + /// + /// SAFETY: Does not panic even on corrupted storage. + fn peek_first(&self) -> Option>> { + if self.first > self.last { + return None + } + let f = (self.first.into() as usize).min(self.heap.len()); + let mut item_slice = &self.heap[f..]; + if let Ok(h) = ItemHeader::::decode(&mut item_slice) { + let payload_len = h.payload_len.into() as usize; + if payload_len <= item_slice.len() { + // impossible to truncate since is sliced up from `self.heap: BoundedVec` + return Some(BoundedSlice::defensive_truncate_from(&item_slice[..payload_len])) + } + } + defensive!("message-queue: heap corruption"); + None + } + + /// Point `first` at the next message, marking the first as processed if `is_processed` is true. + fn skip_first(&mut self, is_processed: bool) { + let f = (self.first.into() as usize).min(self.heap.len()); + if let Ok(mut h) = ItemHeader::decode(&mut &self.heap[f..]) { + if is_processed && !h.is_processed { + h.is_processed = true; + h.using_encoded(|d| self.heap[f..f + d.len()].copy_from_slice(d)); + self.remaining.saturating_dec(); + self.remaining_size.saturating_reduce(h.payload_len); + } + self.first + .saturating_accrue(ItemHeader::::max_encoded_len().saturated_into()); + self.first.saturating_accrue(h.payload_len); + self.first_index.saturating_inc(); + } + } + + /// Return the message with index `index` in the form of `(position, processed, message)`. + fn peek_index(&self, index: usize) -> Option<(usize, bool, &[u8])> { + let mut pos = 0; + let mut item_slice = &self.heap[..]; + let header_len: usize = ItemHeader::::max_encoded_len().saturated_into(); + for _ in 0..index { + let h = ItemHeader::::decode(&mut item_slice).ok()?; + let item_len = h.payload_len.into() as usize; + if item_slice.len() < item_len { + return None + } + item_slice = &item_slice[item_len..]; + pos.saturating_accrue(header_len.saturating_add(item_len)); + } + let h = ItemHeader::::decode(&mut item_slice).ok()?; + if item_slice.len() < h.payload_len.into() as usize { + return None + } + item_slice = &item_slice[..h.payload_len.into() as usize]; + Some((pos, h.is_processed, item_slice)) + } + + /// Set the `is_processed` flag for the item at `pos` to be `true` if not already and decrement + /// the `remaining` counter of the page. + /// + /// Does nothing if no [`ItemHeader`] could be decoded at the given position. + fn note_processed_at_pos(&mut self, pos: usize) { + if let Ok(mut h) = ItemHeader::::decode(&mut &self.heap[pos..]) { + if !h.is_processed { + h.is_processed = true; + h.using_encoded(|d| self.heap[pos..pos + d.len()].copy_from_slice(d)); + self.remaining.saturating_dec(); + self.remaining_size.saturating_reduce(h.payload_len); + } + } + } + + /// Returns whether the page is *complete* which means that no messages remain. + fn is_complete(&self) -> bool { + self.remaining.is_zero() + } +} + +/// A single link in the double-linked Ready Ring list. +#[derive(Clone, Encode, Decode, MaxEncodedLen, TypeInfo, RuntimeDebug, PartialEq)] +pub struct Neighbours { + /// The previous queue. + prev: MessageOrigin, + /// The next queue. + next: MessageOrigin, +} + +/// The state of a queue as represented by a book of its pages. +/// +/// Each queue has exactly one book which holds all of its pages. All pages of a book combined +/// contain all of the messages of its queue; hence the name *Book*. +/// Books can be chained together in a double-linked fashion through their `ready_neighbours` field. +#[derive(Clone, Encode, Decode, MaxEncodedLen, TypeInfo, RuntimeDebug)] +pub struct BookState { + /// The first page with some items to be processed in it. If this is `>= end`, then there are + /// no pages with items to be processing in them. + begin: PageIndex, + /// One more than the last page with some items to be processed in it. + end: PageIndex, + /// The number of pages stored at present. + /// + /// This might be larger than `end-begin`, because we keep pages with unprocessed overweight + /// messages outside of the end/begin window. + count: PageIndex, + /// If this book has any ready pages, then this will be `Some` with the previous and next + /// neighbours. This wraps around. + ready_neighbours: Option>, + /// The number of unprocessed messages stored at present. + message_count: u64, + /// The total size of all unprocessed messages stored at present. + size: u64, +} + +impl Default for BookState { + fn default() -> Self { + Self { begin: 0, end: 0, count: 0, ready_neighbours: None, message_count: 0, size: 0 } + } +} + +/// Handler code for when the items in a queue change. +pub trait OnQueueChanged { + /// Note that the queue `id` now has `item_count` items in it, taking up `items_size` bytes. + fn on_queue_changed(id: Id, items_count: u64, items_size: u64); +} + +impl OnQueueChanged for () { + fn on_queue_changed(_: Id, _: u64, _: u64) {} +} + +#[frame_support::pallet] +pub mod pallet { + use super::*; + + #[pallet::pallet] + #[pallet::generate_store(pub(super) trait Store)] + pub struct Pallet(_); + + /// The module configuration trait. + #[pallet::config] + pub trait Config: frame_system::Config { + /// The overarching event type. + type RuntimeEvent: From> + IsType<::RuntimeEvent>; + + /// Weight information for extrinsics in this pallet. + type WeightInfo: WeightInfo; + + /// Processor for a message. + /// + /// Must be set to [`mock_helpers::NoopMessageProcessor`] for benchmarking. + /// Other message processors that consumes exactly (1, 1) weight for any give message will + /// work as well. Otherwise the benchmarking will also measure the weight of the message + /// processor, which is not desired. + type MessageProcessor: ProcessMessage; + + /// Page/heap size type. + type Size: BaseArithmetic + + Unsigned + + Copy + + Into + + Member + + Encode + + Decode + + MaxEncodedLen + + TypeInfo + + Default; + + /// Code to be called when a message queue changes - either with items introduced or + /// removed. + type QueueChangeHandler: OnQueueChanged<::Origin>; + + /// The size of the page; this implies the maximum message size which can be sent. + /// + /// A good value depends on the expected message sizes, their weights, the weight that is + /// available for processing them and the maximal needed message size. The maximal message + /// size is slightly lower than this as defined by [`MaxMessageLenOf`]. + #[pallet::constant] + type HeapSize: Get; + + /// The maximum number of stale pages (i.e. of overweight messages) allowed before culling + /// can happen. Once there are more stale pages than this, then historical pages may be + /// dropped, even if they contain unprocessed overweight messages. + #[pallet::constant] + type MaxStale: Get; + + /// The amount of weight (if any) which should be provided to the message queue for + /// servicing enqueued items. + /// + /// This may be legitimately `None` in the case that you will call + /// `ServiceQueues::service_queues` manually. + #[pallet::constant] + type ServiceWeight: Get>; + } + + #[pallet::event] + #[pallet::generate_deposit(pub(super) fn deposit_event)] + pub enum Event { + /// Message discarded due to an inability to decode the item. Usually caused by state + /// corruption. + Discarded { hash: T::Hash }, + /// Message discarded due to an error in the `MessageProcessor` (usually a format error). + ProcessingFailed { hash: T::Hash, origin: MessageOriginOf, error: ProcessMessageError }, + /// Message is processed. + Processed { hash: T::Hash, origin: MessageOriginOf, weight_used: Weight, success: bool }, + /// Message placed in overweight queue. + OverweightEnqueued { + hash: T::Hash, + origin: MessageOriginOf, + page_index: PageIndex, + message_index: T::Size, + }, + /// This page was reaped. + PageReaped { origin: MessageOriginOf, index: PageIndex }, + } + + #[pallet::error] + pub enum Error { + /// Page is not reapable because it has items remaining to be processed and is not old + /// enough. + NotReapable, + /// Page to be reaped does not exist. + NoPage, + /// The referenced message could not be found. + NoMessage, + /// The message was already processed and cannot be processed again. + AlreadyProcessed, + /// The message is queued for future execution. + Queued, + /// There is temporarily not enough weight to continue servicing messages. + InsufficientWeight, + } + + /// The index of the first and last (non-empty) pages. + #[pallet::storage] + pub(super) type BookStateFor = + StorageMap<_, Twox64Concat, MessageOriginOf, BookState>, ValueQuery>; + + /// The origin at which we should begin servicing. + #[pallet::storage] + pub(super) type ServiceHead = StorageValue<_, MessageOriginOf, OptionQuery>; + + /// The map of page indices to pages. + #[pallet::storage] + pub(super) type Pages = StorageDoubleMap< + _, + Twox64Concat, + MessageOriginOf, + Twox64Concat, + PageIndex, + Page, + OptionQuery, + >; + + #[pallet::hooks] + impl Hooks> for Pallet { + fn on_initialize(_n: BlockNumberFor) -> Weight { + if let Some(weight_limit) = T::ServiceWeight::get() { + Self::service_queues(weight_limit) + } else { + Weight::zero() + } + } + + /// Check all assumptions about [`crate::Config`]. + fn integrity_test() { + assert!(!MaxMessageLenOf::::get().is_zero(), "HeapSize too low"); + } + } + + #[pallet::call] + impl Pallet { + /// Remove a page which has no more messages remaining to be processed or is stale. + #[pallet::weight(T::WeightInfo::reap_page())] + pub fn reap_page( + origin: OriginFor, + message_origin: MessageOriginOf, + page_index: PageIndex, + ) -> DispatchResult { + let _ = ensure_signed(origin)?; + Self::do_reap_page(&message_origin, page_index) + } + + /// Execute an overweight message. + /// + /// - `origin`: Must be `Signed`. + /// - `message_origin`: The origin from which the message to be executed arrived. + /// - `page`: The page in the queue in which the message to be executed is sitting. + /// - `index`: The index into the queue of the message to be executed. + /// - `weight_limit`: The maximum amount of weight allowed to be consumed in the execution + /// of the message. + /// + /// Benchmark complexity considerations: O(index + weight_limit). + #[pallet::weight( + T::WeightInfo::execute_overweight_page_updated().max( + T::WeightInfo::execute_overweight_page_removed()).saturating_add(*weight_limit) + )] + pub fn execute_overweight( + origin: OriginFor, + message_origin: MessageOriginOf, + page: PageIndex, + index: T::Size, + weight_limit: Weight, + ) -> DispatchResultWithPostInfo { + let _ = ensure_signed(origin)?; + let actual_weight = + Self::do_execute_overweight(message_origin, page, index, weight_limit)?; + Ok(Some(actual_weight).into()) + } + } +} + +/// The status of a page after trying to execute its next message. +#[derive(PartialEq, Debug)] +enum PageExecutionStatus { + /// The execution bailed because there was not enough weight remaining. + Bailed, + /// No more messages could be loaded. This does _not_ imply `page.is_complete()`. + /// + /// The reasons for this status are: + /// - The end of the page is reached but there could still be skipped messages. + /// - The storage is corrupted. + NoMore, +} + +/// The status after trying to execute the next item of a [`Page`]. +#[derive(PartialEq, Debug)] +enum ItemExecutionStatus { + /// The execution bailed because there was not enough weight remaining. + Bailed, + /// The item was not found. + NoItem, + /// Whether the execution of an item resulted in it being processed. + /// + /// One reason for `false` would be permanently overweight. + Executed(bool), +} + +/// The status of an attempt to process a message. +#[derive(PartialEq)] +enum MessageExecutionStatus { + /// There is not enough weight remaining at present. + InsufficientWeight, + /// There will never be enough weight. + Overweight, + /// The message was processed successfully. + Processed, + /// The message was processed and resulted in a permanent error. + Unprocessable, +} + +impl Pallet { + /// Knit `origin` into the ready ring right at the end. + /// + /// Return the two ready ring neighbours of `origin`. + fn ready_ring_knit(origin: &MessageOriginOf) -> Result>, ()> { + if let Some(head) = ServiceHead::::get() { + let mut head_book_state = BookStateFor::::get(&head); + let mut head_neighbours = head_book_state.ready_neighbours.take().ok_or(())?; + let tail = head_neighbours.prev; + head_neighbours.prev = origin.clone(); + head_book_state.ready_neighbours = Some(head_neighbours); + BookStateFor::::insert(&head, head_book_state); + + let mut tail_book_state = BookStateFor::::get(&tail); + let mut tail_neighbours = tail_book_state.ready_neighbours.take().ok_or(())?; + tail_neighbours.next = origin.clone(); + tail_book_state.ready_neighbours = Some(tail_neighbours); + BookStateFor::::insert(&tail, tail_book_state); + + Ok(Neighbours { next: head, prev: tail }) + } else { + ServiceHead::::put(origin); + Ok(Neighbours { next: origin.clone(), prev: origin.clone() }) + } + } + + fn ready_ring_unknit(origin: &MessageOriginOf, neighbours: Neighbours>) { + if origin == &neighbours.next { + debug_assert!( + origin == &neighbours.prev, + "unknitting from single item ring; outgoing must be only item" + ); + // Service queue empty. + ServiceHead::::kill(); + } else { + BookStateFor::::mutate(&neighbours.next, |book_state| { + if let Some(ref mut n) = book_state.ready_neighbours { + n.prev = neighbours.prev.clone() + } + }); + BookStateFor::::mutate(&neighbours.prev, |book_state| { + if let Some(ref mut n) = book_state.ready_neighbours { + n.next = neighbours.next.clone() + } + }); + if let Some(head) = ServiceHead::::get() { + if &head == origin { + ServiceHead::::put(neighbours.next); + } + } else { + defensive!("`ServiceHead` must be some if there was a ready queue"); + } + } + } + + /// Tries to bump the current `ServiceHead` to the next ready queue. + /// + /// Returns the current head if it got be bumped and `None` otherwise. + fn bump_service_head(weight: &mut WeightMeter) -> Option> { + if !weight.check_accrue(T::WeightInfo::bump_service_head()) { + return None + } + + if let Some(head) = ServiceHead::::get() { + let mut head_book_state = BookStateFor::::get(&head); + if let Some(head_neighbours) = head_book_state.ready_neighbours.take() { + ServiceHead::::put(&head_neighbours.next); + Some(head) + } else { + None + } + } else { + None + } + } + + fn do_enqueue_message( + origin: &MessageOriginOf, + message: BoundedSlice>, + ) { + let mut book_state = BookStateFor::::get(origin); + book_state.message_count.saturating_inc(); + book_state + .size + // This should be payload size, but here the payload *is* the message. + .saturating_accrue(message.len() as u64); + + if book_state.end > book_state.begin { + debug_assert!(book_state.ready_neighbours.is_some(), "Must be in ready ring if ready"); + // Already have a page in progress - attempt to append. + let last = book_state.end - 1; + let mut page = match Pages::::get(origin, last) { + Some(p) => p, + None => { + defensive!("Corruption: referenced page doesn't exist."); + return + }, + }; + if page.try_append_message::(message).is_ok() { + Pages::::insert(origin, last, &page); + BookStateFor::::insert(origin, book_state); + return + } + } else { + debug_assert!( + book_state.ready_neighbours.is_none(), + "Must not be in ready ring if not ready" + ); + // insert into ready queue. + match Self::ready_ring_knit(origin) { + Ok(neighbours) => book_state.ready_neighbours = Some(neighbours), + Err(()) => { + defensive!("Ring state invalid when knitting"); + }, + } + } + // No room on the page or no page - link in a new page. + book_state.end.saturating_inc(); + book_state.count.saturating_inc(); + let page = Page::from_message::(message); + Pages::::insert(origin, book_state.end - 1, page); + // NOTE: `T::QueueChangeHandler` is called by the caller. + BookStateFor::::insert(origin, book_state); + } + + /// Try to execute a single message that was marked as overweight. + /// + /// The `weight_limit` is the weight that can be consumed to execute the message. The base + /// weight of the function it self must be measured by the caller. + pub fn do_execute_overweight( + origin: MessageOriginOf, + page_index: PageIndex, + index: T::Size, + weight_limit: Weight, + ) -> Result> { + let mut book_state = BookStateFor::::get(&origin); + let mut page = Pages::::get(&origin, page_index).ok_or(Error::::NoPage)?; + let (pos, is_processed, payload) = + page.peek_index(index.into() as usize).ok_or(Error::::NoMessage)?; + let payload_len = payload.len() as u64; + ensure!( + page_index < book_state.begin || + (page_index == book_state.begin && pos < page.first.into() as usize), + Error::::Queued + ); + ensure!(!is_processed, Error::::AlreadyProcessed); + use MessageExecutionStatus::*; + let mut weight_counter = WeightMeter::from_limit(weight_limit); + match Self::process_message_payload( + origin.clone(), + page_index, + index, + payload, + &mut weight_counter, + Weight::MAX, + // ^^^ We never recognise it as permanently overweight, since that would result in an + // additional overweight event being deposited. + ) { + Overweight | InsufficientWeight => Err(Error::::InsufficientWeight), + Unprocessable | Processed => { + page.note_processed_at_pos(pos); + book_state.message_count.saturating_dec(); + book_state.size.saturating_reduce(payload_len); + let page_weight = if page.remaining.is_zero() { + debug_assert!( + page.remaining_size.is_zero(), + "no messages remaining; no space taken; qed" + ); + Pages::::remove(&origin, page_index); + debug_assert!(book_state.count >= 1, "page exists, so book must have pages"); + book_state.count.saturating_dec(); + T::WeightInfo::execute_overweight_page_removed() + // no need to consider .first or ready ring since processing an overweight page + // would not alter that state. + } else { + Pages::::insert(&origin, page_index, page); + T::WeightInfo::execute_overweight_page_updated() + }; + BookStateFor::::insert(&origin, &book_state); + T::QueueChangeHandler::on_queue_changed( + origin, + book_state.message_count, + book_state.size, + ); + Ok(weight_counter.consumed.saturating_add(page_weight)) + }, + } + } + + /// Remove a stale page or one which has no more messages remaining to be processed. + fn do_reap_page(origin: &MessageOriginOf, page_index: PageIndex) -> DispatchResult { + let mut book_state = BookStateFor::::get(origin); + // definitely not reapable if the page's index is no less than the `begin`ning of ready + // pages. + ensure!(page_index < book_state.begin, Error::::NotReapable); + + let page = Pages::::get(origin, page_index).ok_or(Error::::NoPage)?; + + // definitely reapable if the page has no messages in it. + let reapable = page.remaining.is_zero(); + + // also reapable if the page index has dropped below our watermark. + let cullable = || { + let total_pages = book_state.count; + let ready_pages = book_state.end.saturating_sub(book_state.begin).min(total_pages); + + // The number of stale pages - i.e. pages which contain unprocessed overweight messages. + // We would prefer to keep these around but will restrict how far into history they can + // extend if we notice that there's too many of them. + // + // We don't know *where* in history these pages are so we use a dynamic formula which + // reduces the historical time horizon as the stale pages pile up and increases it as + // they reduce. + let stale_pages = total_pages - ready_pages; + + // The maximum number of stale pages (i.e. of overweight messages) allowed before + // culling can happen at all. Once there are more stale pages than this, then historical + // pages may be dropped, even if they contain unprocessed overweight messages. + let max_stale = T::MaxStale::get(); + + // The amount beyond the maximum which are being used. If it's not beyond the maximum + // then we exit now since no culling is needed. + let overflow = match stale_pages.checked_sub(max_stale + 1) { + Some(x) => x + 1, + None => return false, + }; + + // The special formula which tells us how deep into index-history we will pages. As + // the overflow is greater (and thus the need to drop items from storage is more urgent) + // this is reduced, allowing a greater range of pages to be culled. + // With a minimum `overflow` (`1`), this returns `max_stale ** 2`, indicating we only + // cull beyond that number of indices deep into history. + // At this overflow increases, our depth reduces down to a limit of `max_stale`. We + // never want to reduce below this since this will certainly allow enough pages to be + // culled in order to bring `overflow` back to zero. + let backlog = (max_stale * max_stale / overflow).max(max_stale); + + let watermark = book_state.begin.saturating_sub(backlog); + page_index < watermark + }; + ensure!(reapable || cullable(), Error::::NotReapable); + + Pages::::remove(origin, page_index); + debug_assert!(book_state.count > 0, "reaping a page implies there are pages"); + book_state.count.saturating_dec(); + book_state.message_count.saturating_reduce(page.remaining.into() as u64); + book_state.size.saturating_reduce(page.remaining_size.into() as u64); + BookStateFor::::insert(origin, &book_state); + T::QueueChangeHandler::on_queue_changed( + origin.clone(), + book_state.message_count, + book_state.size, + ); + Self::deposit_event(Event::PageReaped { origin: origin.clone(), index: page_index }); + + Ok(()) + } + + /// Execute any messages remaining to be processed in the queue of `origin`, using up to + /// `weight_limit` to do so. Any messages which would take more than `overweight_limit` to + /// execute are deemed overweight and ignored. + fn service_queue( + origin: MessageOriginOf, + weight: &mut WeightMeter, + overweight_limit: Weight, + ) -> (bool, Option>) { + if !weight.check_accrue( + T::WeightInfo::service_queue_base().saturating_add(T::WeightInfo::ready_ring_unknit()), + ) { + return (false, None) + } + + let mut book_state = BookStateFor::::get(&origin); + let mut total_processed = 0; + + while book_state.end > book_state.begin { + let (processed, status) = + Self::service_page(&origin, &mut book_state, weight, overweight_limit); + total_processed.saturating_accrue(processed); + match status { + // Store the page progress and do not go to the next one. + PageExecutionStatus::Bailed => break, + // Go to the next page if this one is at the end. + PageExecutionStatus::NoMore => (), + }; + book_state.begin.saturating_inc(); + } + let next_ready = book_state.ready_neighbours.as_ref().map(|x| x.next.clone()); + if book_state.begin >= book_state.end && total_processed > 0 { + // No longer ready - unknit. + if let Some(neighbours) = book_state.ready_neighbours.take() { + Self::ready_ring_unknit(&origin, neighbours); + } else { + defensive!("Freshly processed queue must have been ready"); + } + } + BookStateFor::::insert(&origin, &book_state); + if total_processed > 0 { + T::QueueChangeHandler::on_queue_changed( + origin, + book_state.message_count, + book_state.size, + ); + } + (total_processed > 0, next_ready) + } + + /// Service as many messages of a page as possible. + /// + /// Returns how many messages were processed and the page's status. + fn service_page( + origin: &MessageOriginOf, + book_state: &mut BookStateOf, + weight: &mut WeightMeter, + overweight_limit: Weight, + ) -> (u32, PageExecutionStatus) { + use PageExecutionStatus::*; + if !weight.check_accrue( + T::WeightInfo::service_page_base_completion() + .max(T::WeightInfo::service_page_base_no_completion()), + ) { + return (0, Bailed) + } + + let page_index = book_state.begin; + let mut page = match Pages::::get(origin, page_index) { + Some(p) => p, + None => { + defensive!("message-queue: referenced page not found"); + return (0, NoMore) + }, + }; + + let mut total_processed = 0; + + // Execute as many messages as possible. + let status = loop { + use ItemExecutionStatus::*; + match Self::service_page_item( + origin, + page_index, + book_state, + &mut page, + weight, + overweight_limit, + ) { + Bailed => break PageExecutionStatus::Bailed, + NoItem => break PageExecutionStatus::NoMore, + // Keep going as long as we make progress... + Executed(true) => total_processed.saturating_inc(), + Executed(false) => (), + } + }; + + if page.is_complete() { + debug_assert!(status != Bailed, "we never bail if a page became complete"); + Pages::::remove(origin, page_index); + debug_assert!(book_state.count > 0, "completing a page implies there are pages"); + book_state.count.saturating_dec(); + } else { + Pages::::insert(origin, page_index, page); + } + (total_processed, status) + } + + /// Execute the next message of a page. + pub(crate) fn service_page_item( + origin: &MessageOriginOf, + page_index: PageIndex, + book_state: &mut BookStateOf, + page: &mut PageOf, + weight: &mut WeightMeter, + overweight_limit: Weight, + ) -> ItemExecutionStatus { + // This ugly pre-checking is needed for the invariant + // "we never bail if a page became complete". + if page.is_complete() { + return ItemExecutionStatus::NoItem + } + if !weight.check_accrue(T::WeightInfo::service_page_item()) { + return ItemExecutionStatus::Bailed + } + + let payload = &match page.peek_first() { + Some(m) => m, + None => return ItemExecutionStatus::NoItem, + }[..]; + + use MessageExecutionStatus::*; + let is_processed = match Self::process_message_payload( + origin.clone(), + page_index, + page.first_index, + payload.deref(), + weight, + overweight_limit, + ) { + InsufficientWeight => return ItemExecutionStatus::Bailed, + Processed | Unprocessable => true, + Overweight => false, + }; + + if is_processed { + book_state.message_count.saturating_dec(); + book_state.size.saturating_reduce(payload.len() as u64); + } + page.skip_first(is_processed); + ItemExecutionStatus::Executed(is_processed) + } + + /// Print the pages in each queue and the messages in each page. + /// + /// Processed messages are prefixed with a `*` and the current `begin`ning page with a `>`. + /// + /// # Example output + /// + /// ```text + /// queue Here: + /// page 0: [] + /// > page 1: [] + /// page 2: ["\0weight=4", "\0c", ] + /// page 3: ["\0bigbig 1", ] + /// page 4: ["\0bigbig 2", ] + /// page 5: ["\0bigbig 3", ] + /// ``` + #[cfg(feature = "std")] + pub fn debug_info() -> String { + let mut info = String::new(); + for (origin, book_state) in BookStateFor::::iter() { + let mut queue = format!("queue {:?}:\n", &origin); + let mut pages = Pages::::iter_prefix(&origin).collect::>(); + pages.sort_by(|(a, _), (b, _)| a.cmp(b)); + for (page_index, mut page) in pages.into_iter() { + let page_info = if book_state.begin == page_index { ">" } else { " " }; + let mut page_info = format!( + "{} page {} ({:?} first, {:?} last, {:?} remain): [ ", + page_info, page_index, page.first, page.last, page.remaining + ); + for i in 0..u32::MAX { + if let Some((_, processed, message)) = + page.peek_index(i.try_into().expect("std-only code")) + { + let msg = String::from_utf8_lossy(message.deref()); + if processed { + page_info.push('*'); + } + page_info.push_str(&format!("{:?}, ", msg)); + page.skip_first(true); + } else { + break + } + } + page_info.push_str("]\n"); + queue.push_str(&page_info); + } + info.push_str(&queue); + } + info + } + + /// Process a single message. + /// + /// The base weight of this function needs to be accounted for by the caller. `weight` is the + /// remaining weight to process the message. `overweight_limit` is the maximum weight that a + /// message can ever consume. Messages above this limit are marked as permanently overweight. + fn process_message_payload( + origin: MessageOriginOf, + page_index: PageIndex, + message_index: T::Size, + message: &[u8], + weight: &mut WeightMeter, + overweight_limit: Weight, + ) -> MessageExecutionStatus { + let hash = T::Hashing::hash(message); + use ProcessMessageError::Overweight; + match T::MessageProcessor::process_message(message, origin.clone(), weight.remaining()) { + Err(Overweight(w)) if w.any_gt(overweight_limit) => { + // Permanently overweight. + Self::deposit_event(Event::::OverweightEnqueued { + hash, + origin, + page_index, + message_index, + }); + MessageExecutionStatus::Overweight + }, + Err(Overweight(_)) => { + // Temporarily overweight - save progress and stop processing this + // queue. + MessageExecutionStatus::InsufficientWeight + }, + Err(error) => { + // Permanent error - drop + Self::deposit_event(Event::::ProcessingFailed { hash, origin, error }); + MessageExecutionStatus::Unprocessable + }, + Ok((success, weight_used)) => { + // Success + weight.defensive_saturating_accrue(weight_used); + let event = Event::::Processed { hash, origin, weight_used, success }; + Self::deposit_event(event); + MessageExecutionStatus::Processed + }, + } + } +} + +/// Provides a [`sp_core::Get`] to access the `MEL` of a [`codec::MaxEncodedLen`] type. +pub struct MaxEncodedLenOf(sp_std::marker::PhantomData); +impl Get for MaxEncodedLenOf { + fn get() -> u32 { + T::max_encoded_len() as u32 + } +} + +/// Calculates the maximum message length and exposed it through the [`codec::MaxEncodedLen`] trait. +pub struct MaxMessageLen( + sp_std::marker::PhantomData<(Origin, Size, HeapSize)>, +); +impl, HeapSize: Get> Get + for MaxMessageLen +{ + fn get() -> u32 { + (HeapSize::get().into()).saturating_sub(ItemHeader::::max_encoded_len() as u32) + } +} + +/// The maximal message length. +pub type MaxMessageLenOf = + MaxMessageLen, ::Size, ::HeapSize>; +/// The maximal encoded origin length. +pub type MaxOriginLenOf = MaxEncodedLenOf>; +/// The `MessageOrigin` of this pallet. +pub type MessageOriginOf = <::MessageProcessor as ProcessMessage>::Origin; +/// The maximal heap size of a page. +pub type HeapSizeU32Of = IntoU32<::HeapSize, ::Size>; +/// The [`Page`] of this pallet. +pub type PageOf = Page<::Size, ::HeapSize>; +/// The [`BookState`] of this pallet. +pub type BookStateOf = BookState>; + +/// Converts a [`sp_core::Get`] with returns a type that can be cast into an `u32` into a `Get` +/// which returns an `u32`. +pub struct IntoU32(sp_std::marker::PhantomData<(T, O)>); +impl, O: Into> Get for IntoU32 { + fn get() -> u32 { + T::get().into() + } +} + +impl ServiceQueues for Pallet { + type OverweightMessageAddress = (MessageOriginOf, PageIndex, T::Size); + + fn service_queues(weight_limit: Weight) -> Weight { + // The maximum weight that processing a single message may take. + let overweight_limit = weight_limit; + let mut weight = WeightMeter::from_limit(weight_limit); + + let mut next = match Self::bump_service_head(&mut weight) { + Some(h) => h, + None => return weight.consumed, + }; + // The last queue that did not make any progress. + // The loop aborts as soon as it arrives at this queue again without making any progress + // on other queues in between. + let mut last_no_progress = None; + + loop { + let (progressed, n) = Self::service_queue(next.clone(), &mut weight, overweight_limit); + next = match n { + Some(n) => + if !progressed { + if last_no_progress == Some(n.clone()) { + break + } + if last_no_progress.is_none() { + last_no_progress = Some(next.clone()) + } + n + } else { + last_no_progress = None; + n + }, + None => break, + } + } + weight.consumed + } + + /// Execute a single overweight message. + /// + /// The weight limit must be enough for `execute_overweight` and the message execution itself. + fn execute_overweight( + weight_limit: Weight, + (message_origin, page, index): Self::OverweightMessageAddress, + ) -> Result { + let mut weight = WeightMeter::from_limit(weight_limit); + if !weight.check_accrue( + T::WeightInfo::execute_overweight_page_removed() + .max(T::WeightInfo::execute_overweight_page_updated()), + ) { + return Err(ExecuteOverweightError::InsufficientWeight) + } + + Pallet::::do_execute_overweight(message_origin, page, index, weight.remaining()).map_err( + |e| match e { + Error::::InsufficientWeight => ExecuteOverweightError::InsufficientWeight, + _ => ExecuteOverweightError::NotFound, + }, + ) + } +} + +impl EnqueueMessage> for Pallet { + type MaxMessageLen = + MaxMessageLen<::Origin, T::Size, T::HeapSize>; + + fn enqueue_message( + message: BoundedSlice, + origin: ::Origin, + ) { + Self::do_enqueue_message(&origin, message); + let book_state = BookStateFor::::get(&origin); + T::QueueChangeHandler::on_queue_changed(origin, book_state.message_count, book_state.size); + } + + fn enqueue_messages<'a>( + messages: impl Iterator>, + origin: ::Origin, + ) { + for message in messages { + Self::do_enqueue_message(&origin, message); + } + let book_state = BookStateFor::::get(&origin); + T::QueueChangeHandler::on_queue_changed(origin, book_state.message_count, book_state.size); + } + + fn sweep_queue(origin: MessageOriginOf) { + if !BookStateFor::::contains_key(&origin) { + return + } + let mut book_state = BookStateFor::::get(&origin); + book_state.begin = book_state.end; + if let Some(neighbours) = book_state.ready_neighbours.take() { + Self::ready_ring_unknit(&origin, neighbours); + } + BookStateFor::::insert(&origin, &book_state); + } + + fn footprint(origin: MessageOriginOf) -> Footprint { + let book_state = BookStateFor::::get(&origin); + Footprint { count: book_state.message_count, size: book_state.size } + } +} diff --git a/frame/message-queue/src/mock.rs b/frame/message-queue/src/mock.rs new file mode 100644 index 0000000000000..bb9942443e226 --- /dev/null +++ b/frame/message-queue/src/mock.rs @@ -0,0 +1,312 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +#![cfg(test)] + +pub use super::mock_helpers::*; +use super::*; + +use crate as pallet_message_queue; +use frame_support::{ + parameter_types, + traits::{ConstU32, ConstU64}, +}; +use sp_core::H256; +use sp_runtime::{ + testing::Header, + traits::{BlakeTwo256, IdentityLookup}, +}; +use sp_std::collections::btree_map::BTreeMap; + +type UncheckedExtrinsic = frame_system::mocking::MockUncheckedExtrinsic; +type Block = frame_system::mocking::MockBlock; + +frame_support::construct_runtime!( + pub enum Test where + Block = Block, + NodeBlock = Block, + UncheckedExtrinsic = UncheckedExtrinsic, + { + System: frame_system::{Pallet, Call, Config, Storage, Event}, + MessageQueue: pallet_message_queue::{Pallet, Call, Storage, Event}, + } +); +parameter_types! { + pub BlockWeights: frame_system::limits::BlockWeights = + frame_system::limits::BlockWeights::simple_max(frame_support::weights::Weight::from_ref_time(1024)); +} +impl frame_system::Config for Test { + type BaseCallFilter = frame_support::traits::Everything; + type BlockWeights = (); + type BlockLength = (); + type DbWeight = (); + type RuntimeOrigin = RuntimeOrigin; + type Index = u64; + type BlockNumber = u64; + type Hash = H256; + type RuntimeCall = RuntimeCall; + type Hashing = BlakeTwo256; + type AccountId = u64; + type Lookup = IdentityLookup; + type Header = Header; + type RuntimeEvent = RuntimeEvent; + type BlockHashCount = ConstU64<250>; + type Version = (); + type PalletInfo = PalletInfo; + type AccountData = (); + type OnNewAccount = (); + type OnKilledAccount = (); + type SystemWeightInfo = (); + type SS58Prefix = (); + type OnSetCode = (); + type MaxConsumers = ConstU32<16>; +} +parameter_types! { + pub const HeapSize: u32 = 24; + pub const MaxStale: u32 = 2; + pub const ServiceWeight: Option = Some(Weight::from_parts(10, 10)); +} +impl Config for Test { + type RuntimeEvent = RuntimeEvent; + type WeightInfo = MockedWeightInfo; + type MessageProcessor = RecordingMessageProcessor; + type Size = u32; + type QueueChangeHandler = RecordingQueueChangeHandler; + type HeapSize = HeapSize; + type MaxStale = MaxStale; + type ServiceWeight = ServiceWeight; +} + +/// Mocked `WeightInfo` impl with allows to set the weight per call. +pub struct MockedWeightInfo; + +parameter_types! { + /// Storage for `MockedWeightInfo`, do not use directly. + pub static WeightForCall: BTreeMap = Default::default(); +} + +/// Set the return value for a function from the `WeightInfo` trait. +impl MockedWeightInfo { + /// Set the weight of a specific weight function. + pub fn set_weight(call_name: &str, weight: Weight) { + let mut calls = WeightForCall::get(); + calls.insert(call_name.into(), weight); + WeightForCall::set(calls); + } +} + +impl crate::weights::WeightInfo for MockedWeightInfo { + fn reap_page() -> Weight { + WeightForCall::get().get("reap_page").copied().unwrap_or_default() + } + fn execute_overweight_page_updated() -> Weight { + WeightForCall::get() + .get("execute_overweight_page_updated") + .copied() + .unwrap_or_default() + } + fn execute_overweight_page_removed() -> Weight { + WeightForCall::get() + .get("execute_overweight_page_removed") + .copied() + .unwrap_or_default() + } + fn service_page_base_completion() -> Weight { + WeightForCall::get() + .get("service_page_base_completion") + .copied() + .unwrap_or_default() + } + fn service_page_base_no_completion() -> Weight { + WeightForCall::get() + .get("service_page_base_no_completion") + .copied() + .unwrap_or_default() + } + fn service_queue_base() -> Weight { + WeightForCall::get().get("service_queue_base").copied().unwrap_or_default() + } + fn bump_service_head() -> Weight { + WeightForCall::get().get("bump_service_head").copied().unwrap_or_default() + } + fn service_page_item() -> Weight { + WeightForCall::get().get("service_page_item").copied().unwrap_or_default() + } + fn ready_ring_knit() -> Weight { + WeightForCall::get().get("ready_ring_knit").copied().unwrap_or_default() + } + fn ready_ring_unknit() -> Weight { + WeightForCall::get().get("ready_ring_unknit").copied().unwrap_or_default() + } +} + +parameter_types! { + pub static MessagesProcessed: Vec<(Vec, MessageOrigin)> = vec![]; +} + +/// A message processor which records all processed messages into [`MessagesProcessed`]. +pub struct RecordingMessageProcessor; +impl ProcessMessage for RecordingMessageProcessor { + /// The transport from where a message originates. + type Origin = MessageOrigin; + + /// Process the given message, using no more than `weight_limit` in weight to do so. + /// + /// Consumes exactly `n` weight of all components if it starts `weight=n` and `1` otherwise. + /// Errors if given the `weight_limit` is insufficient to process the message or if the message + /// is `badformat`, `corrupt` or `unsupported` with the respective error. + fn process_message( + message: &[u8], + origin: Self::Origin, + weight_limit: Weight, + ) -> Result<(bool, Weight), ProcessMessageError> { + processing_message(message)?; + + let weight = if message.starts_with(&b"weight="[..]) { + let mut w: u64 = 0; + for &c in &message[7..] { + if (b'0'..=b'9').contains(&c) { + w = w * 10 + (c - b'0') as u64; + } else { + break + } + } + w + } else { + 1 + }; + let weight = Weight::from_parts(weight, weight); + + if weight.all_lte(weight_limit) { + let mut m = MessagesProcessed::get(); + m.push((message.to_vec(), origin)); + MessagesProcessed::set(m); + Ok((true, weight)) + } else { + Err(ProcessMessageError::Overweight(weight)) + } + } +} + +/// Processed a mocked message. Messages that end with `badformat`, `corrupt` or `unsupported` will +/// fail with the respective error. +fn processing_message(msg: &[u8]) -> Result<(), ProcessMessageError> { + let msg = String::from_utf8_lossy(msg); + if msg.ends_with("badformat") { + Err(ProcessMessageError::BadFormat) + } else if msg.ends_with("corrupt") { + Err(ProcessMessageError::Corrupt) + } else if msg.ends_with("unsupported") { + Err(ProcessMessageError::Unsupported) + } else { + Ok(()) + } +} + +parameter_types! { + pub static NumMessagesProcessed: usize = 0; + pub static NumMessagesErrored: usize = 0; +} + +/// Similar to [`RecordingMessageProcessor`] but only counts the number of messages processed and +/// does always consume one weight per message. +/// +/// The [`RecordingMessageProcessor`] is a bit too slow for the integration tests. +pub struct CountingMessageProcessor; +impl ProcessMessage for CountingMessageProcessor { + type Origin = MessageOrigin; + + fn process_message( + message: &[u8], + _origin: Self::Origin, + weight_limit: Weight, + ) -> Result<(bool, Weight), ProcessMessageError> { + if let Err(e) = processing_message(message) { + NumMessagesErrored::set(NumMessagesErrored::get() + 1); + return Err(e) + } + let weight = Weight::from_parts(1, 1); + + if weight.all_lte(weight_limit) { + NumMessagesProcessed::set(NumMessagesProcessed::get() + 1); + Ok((true, weight)) + } else { + Err(ProcessMessageError::Overweight(weight)) + } + } +} + +parameter_types! { + /// Storage for `RecordingQueueChangeHandler`, do not use directly. + pub static QueueChanges: Vec<(MessageOrigin, u64, u64)> = vec![]; +} + +/// Records all queue changes into [`QueueChanges`]. +pub struct RecordingQueueChangeHandler; +impl OnQueueChanged for RecordingQueueChangeHandler { + fn on_queue_changed(id: MessageOrigin, items_count: u64, items_size: u64) { + QueueChanges::mutate(|cs| cs.push((id, items_count, items_size))); + } +} + +/// Create new test externalities. +/// +/// Is generic since it is used by the unit test, integration tests and benchmarks. +pub fn new_test_ext() -> sp_io::TestExternalities +where + ::BlockNumber: From, +{ + sp_tracing::try_init_simple(); + WeightForCall::take(); + QueueChanges::take(); + NumMessagesErrored::take(); + let t = frame_system::GenesisConfig::default().build_storage::().unwrap(); + let mut ext = sp_io::TestExternalities::new(t); + ext.execute_with(|| frame_system::Pallet::::set_block_number(1.into())); + ext +} + +/// Set the weight of a specific weight function. +pub fn set_weight(name: &str, w: Weight) { + MockedWeightInfo::set_weight::(name, w); +} + +/// Assert that exactly these pages are present. Assumes `Here` origin. +pub fn assert_pages(indices: &[u32]) { + assert_eq!(Pages::::iter().count(), indices.len()); + for i in indices { + assert!(Pages::::contains_key(MessageOrigin::Here, i)); + } +} + +/// Build a ring with three queues: `Here`, `There` and `Everywhere(0)`. +pub fn build_triple_ring() { + use MessageOrigin::*; + build_ring::(&[Here, There, Everywhere(0)]) +} + +/// Shim to get rid of the annoying `::` everywhere. +pub fn assert_ring(queues: &[MessageOrigin]) { + super::mock_helpers::assert_ring::(queues); +} + +pub fn knit(queue: &MessageOrigin) { + super::mock_helpers::knit::(queue); +} + +pub fn unknit(queue: &MessageOrigin) { + super::mock_helpers::unknit::(queue); +} diff --git a/frame/message-queue/src/mock_helpers.rs b/frame/message-queue/src/mock_helpers.rs new file mode 100644 index 0000000000000..39d961d8fc558 --- /dev/null +++ b/frame/message-queue/src/mock_helpers.rs @@ -0,0 +1,185 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Std setup helpers for testing and benchmarking. +//! +//! Cannot be put into mock.rs since benchmarks require no-std and mock.rs is std. + +use crate::*; +use frame_support::traits::Defensive; + +/// Converts `Self` into a `Weight` by using `Self` for all components. +pub trait IntoWeight { + fn into_weight(self) -> Weight; +} + +impl IntoWeight for u64 { + fn into_weight(self) -> Weight { + Weight::from_parts(self, self) + } +} + +/// Mocked message origin for testing. +#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, MaxEncodedLen, TypeInfo, Debug)] +pub enum MessageOrigin { + Here, + There, + Everywhere(u32), +} + +impl From for MessageOrigin { + fn from(i: u32) -> Self { + Self::Everywhere(i) + } +} + +/// Processes any message and consumes (1, 1) weight per message. +pub struct NoopMessageProcessor; +impl ProcessMessage for NoopMessageProcessor { + type Origin = MessageOrigin; + + fn process_message( + _message: &[u8], + _origin: Self::Origin, + weight_limit: Weight, + ) -> Result<(bool, Weight), ProcessMessageError> { + let weight = Weight::from_parts(1, 1); + + if weight.all_lte(weight_limit) { + Ok((true, weight)) + } else { + Err(ProcessMessageError::Overweight(weight)) + } + } +} + +/// Create a message from the given data. +pub fn msg>(x: &'static str) -> BoundedSlice { + BoundedSlice::defensive_truncate_from(x.as_bytes()) +} + +pub fn vmsg(x: &'static str) -> Vec { + x.as_bytes().to_vec() +} + +/// Create a page from a single message. +pub fn page(msg: &[u8]) -> PageOf { + PageOf::::from_message::(msg.try_into().unwrap()) +} + +pub fn single_page_book() -> BookStateOf { + BookState { begin: 0, end: 1, count: 1, ready_neighbours: None, message_count: 0, size: 0 } +} + +pub fn empty_book() -> BookStateOf { + BookState { begin: 0, end: 1, count: 1, ready_neighbours: None, message_count: 0, size: 0 } +} + +/// Returns a full page of messages with their index as payload and the number of messages. +pub fn full_page() -> (PageOf, usize) { + let mut msgs = 0; + let mut page = PageOf::::default(); + for i in 0..u32::MAX { + let r = i.using_encoded(|d| page.try_append_message::(d.try_into().unwrap())); + if r.is_err() { + break + } else { + msgs += 1; + } + } + assert!(msgs > 0, "page must hold at least one message"); + (page, msgs) +} + +/// Returns a page filled with empty messages and the number of messages. +pub fn book_for(page: &PageOf) -> BookStateOf { + BookState { + count: 1, + begin: 0, + end: 1, + ready_neighbours: None, + message_count: page.remaining.into() as u64, + size: page.remaining_size.into() as u64, + } +} + +/// Assert the last event that was emitted. +#[cfg(any(feature = "std", feature = "runtime-benchmarks", test))] +pub fn assert_last_event(generic_event: ::RuntimeEvent) { + assert!( + !frame_system::Pallet::::block_number().is_zero(), + "The genesis block has n o events" + ); + frame_system::Pallet::::assert_last_event(generic_event.into()); +} + +/// Provide a setup for `bump_service_head`. +pub fn setup_bump_service_head( + current: <::MessageProcessor as ProcessMessage>::Origin, + next: <::MessageProcessor as ProcessMessage>::Origin, +) { + let mut book = single_page_book::(); + book.ready_neighbours = Some(Neighbours::> { prev: next.clone(), next }); + ServiceHead::::put(¤t); + BookStateFor::::insert(¤t, &book); +} + +/// Knit a queue into the ready-ring and write it back to storage. +pub fn knit(o: &<::MessageProcessor as ProcessMessage>::Origin) { + let mut b = BookStateFor::::get(o); + b.ready_neighbours = crate::Pallet::::ready_ring_knit(o).ok().defensive(); + BookStateFor::::insert(o, b); +} + +/// Unknit a queue into the ready-ring and write it back to storage. +pub fn unknit(o: &<::MessageProcessor as ProcessMessage>::Origin) { + let mut b = BookStateFor::::get(o); + crate::Pallet::::ready_ring_unknit(o, b.ready_neighbours.unwrap()); + b.ready_neighbours = None; + BookStateFor::::insert(o, b); +} + +/// Build a ring with three queues: `Here`, `There` and `Everywhere(0)`. +pub fn build_ring( + queues: &[<::MessageProcessor as ProcessMessage>::Origin], +) { + for queue in queues { + BookStateFor::::insert(queue, empty_book::()); + } + for queue in queues { + knit::(queue); + } + assert_ring::(queues); +} + +/// Check that the Ready Ring consists of `queues` in that exact order. +/// +/// Also check that all backlinks are valid and that the first element is the service head. +pub fn assert_ring( + queues: &[<::MessageProcessor as ProcessMessage>::Origin], +) { + for (i, origin) in queues.iter().enumerate() { + let book = BookStateFor::::get(origin); + assert_eq!( + book.ready_neighbours, + Some(Neighbours { + prev: queues[(i + queues.len() - 1) % queues.len()].clone(), + next: queues[(i + 1) % queues.len()].clone(), + }) + ); + } + assert_eq!(ServiceHead::::get(), queues.first().cloned()); +} diff --git a/frame/message-queue/src/tests.rs b/frame/message-queue/src/tests.rs new file mode 100644 index 0000000000000..103fb690ddba7 --- /dev/null +++ b/frame/message-queue/src/tests.rs @@ -0,0 +1,1092 @@ +// This file is part of Substrate. + +// Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: Apache-2.0 + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Tests for Message Queue Pallet. + +#![cfg(test)] + +use crate::{mock::*, *}; + +use frame_support::{assert_noop, assert_ok, assert_storage_noop, StorageNoopGuard}; +use rand::{rngs::StdRng, Rng, SeedableRng}; + +#[test] +fn mocked_weight_works() { + new_test_ext::().execute_with(|| { + assert!(::WeightInfo::service_queue_base().is_zero()); + }); + new_test_ext::().execute_with(|| { + set_weight("service_queue_base", Weight::MAX); + assert_eq!(::WeightInfo::service_queue_base(), Weight::MAX); + }); + // The externalities reset it. + new_test_ext::().execute_with(|| { + assert!(::WeightInfo::service_queue_base().is_zero()); + }); +} + +#[test] +fn enqueue_within_one_page_works() { + new_test_ext::().execute_with(|| { + use MessageOrigin::*; + MessageQueue::enqueue_message(msg("a"), Here); + MessageQueue::enqueue_message(msg("b"), Here); + MessageQueue::enqueue_message(msg("c"), Here); + assert_eq!(MessageQueue::service_queues(2.into_weight()), 2.into_weight()); + assert_eq!(MessagesProcessed::take(), vec![(b"a".to_vec(), Here), (b"b".to_vec(), Here)]); + + assert_eq!(MessageQueue::service_queues(2.into_weight()), 1.into_weight()); + assert_eq!(MessagesProcessed::take(), vec![(b"c".to_vec(), Here)]); + + assert_eq!(MessageQueue::service_queues(2.into_weight()), 0.into_weight()); + assert!(MessagesProcessed::get().is_empty()); + + MessageQueue::enqueue_messages([msg("a"), msg("b"), msg("c")].into_iter(), There); + + assert_eq!(MessageQueue::service_queues(2.into_weight()), 2.into_weight()); + assert_eq!( + MessagesProcessed::take(), + vec![(b"a".to_vec(), There), (b"b".to_vec(), There),] + ); + + MessageQueue::enqueue_message(msg("d"), Everywhere(1)); + + assert_eq!(MessageQueue::service_queues(2.into_weight()), 2.into_weight()); + assert_eq!(MessageQueue::service_queues(2.into_weight()), 0.into_weight()); + assert_eq!( + MessagesProcessed::take(), + vec![(b"c".to_vec(), There), (b"d".to_vec(), Everywhere(1))] + ); + }); +} + +#[test] +fn queue_priority_retains() { + new_test_ext::().execute_with(|| { + use MessageOrigin::*; + assert_ring(&[]); + MessageQueue::enqueue_message(msg("a"), Everywhere(1)); + assert_ring(&[Everywhere(1)]); + MessageQueue::enqueue_message(msg("b"), Everywhere(2)); + assert_ring(&[Everywhere(1), Everywhere(2)]); + MessageQueue::enqueue_message(msg("c"), Everywhere(3)); + assert_ring(&[Everywhere(1), Everywhere(2), Everywhere(3)]); + MessageQueue::enqueue_message(msg("d"), Everywhere(2)); + assert_ring(&[Everywhere(1), Everywhere(2), Everywhere(3)]); + // service head is 1, it will process a, leaving service head at 2. it also processes b but + // doees not empty queue 2, so service head will end at 2. + assert_eq!(MessageQueue::service_queues(2.into_weight()), 2.into_weight()); + assert_eq!( + MessagesProcessed::take(), + vec![(vmsg("a"), Everywhere(1)), (vmsg("b"), Everywhere(2)),] + ); + assert_ring(&[Everywhere(2), Everywhere(3)]); + // service head is 2, so will process d first, then c. + assert_eq!(MessageQueue::service_queues(2.into_weight()), 2.into_weight()); + assert_eq!( + MessagesProcessed::get(), + vec![(vmsg("d"), Everywhere(2)), (vmsg("c"), Everywhere(3)),] + ); + assert_ring(&[]); + }); +} + +#[test] +fn queue_priority_reset_once_serviced() { + new_test_ext::().execute_with(|| { + use MessageOrigin::*; + MessageQueue::enqueue_message(msg("a"), Everywhere(1)); + MessageQueue::enqueue_message(msg("b"), Everywhere(2)); + MessageQueue::enqueue_message(msg("c"), Everywhere(3)); + // service head is 1, it will process a, leaving service head at 2. it also processes b and + // empties queue 2, so service head will end at 3. + assert_eq!(MessageQueue::service_queues(2.into_weight()), 2.into_weight()); + MessageQueue::enqueue_message(msg("d"), Everywhere(2)); + // service head is 3, so will process c first, then d. + assert_eq!(MessageQueue::service_queues(2.into_weight()), 2.into_weight()); + + assert_eq!( + MessagesProcessed::get(), + vec![ + (vmsg("a"), Everywhere(1)), + (vmsg("b"), Everywhere(2)), + (vmsg("c"), Everywhere(3)), + (vmsg("d"), Everywhere(2)), + ] + ); + }); +} + +#[test] +fn service_queues_basic_works() { + use MessageOrigin::*; + new_test_ext::().execute_with(|| { + MessageQueue::enqueue_messages(vec![msg("a"), msg("ab"), msg("abc")].into_iter(), Here); + MessageQueue::enqueue_messages(vec![msg("x"), msg("xy"), msg("xyz")].into_iter(), There); + assert_eq!(QueueChanges::take(), vec![(Here, 3, 6), (There, 3, 6)]); + + // Service one message from `Here`. + assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight()); + assert_eq!(MessagesProcessed::take(), vec![(vmsg("a"), Here)]); + assert_eq!(QueueChanges::take(), vec![(Here, 2, 5)]); + + // Service one message from `There`. + ServiceHead::::set(There.into()); + assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight()); + assert_eq!(MessagesProcessed::take(), vec![(vmsg("x"), There)]); + assert_eq!(QueueChanges::take(), vec![(There, 2, 5)]); + + // Service the remaining from `Here`. + ServiceHead::::set(Here.into()); + assert_eq!(MessageQueue::service_queues(2.into_weight()), 2.into_weight()); + assert_eq!(MessagesProcessed::take(), vec![(vmsg("ab"), Here), (vmsg("abc"), Here)]); + assert_eq!(QueueChanges::take(), vec![(Here, 0, 0)]); + + // Service all remaining messages. + assert_eq!(MessageQueue::service_queues(Weight::MAX), 2.into_weight()); + assert_eq!(MessagesProcessed::take(), vec![(vmsg("xy"), There), (vmsg("xyz"), There)]); + assert_eq!(QueueChanges::take(), vec![(There, 0, 0)]); + }); +} + +#[test] +fn service_queues_failing_messages_works() { + use MessageOrigin::*; + new_test_ext::().execute_with(|| { + set_weight("service_page_item", 1.into_weight()); + MessageQueue::enqueue_message(msg("badformat"), Here); + MessageQueue::enqueue_message(msg("corrupt"), Here); + MessageQueue::enqueue_message(msg("unsupported"), Here); + // Starts with three pages. + assert_pages(&[0, 1, 2]); + + assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight()); + assert_last_event::( + Event::ProcessingFailed { + hash: ::Hashing::hash(b"badformat"), + origin: MessageOrigin::Here, + error: ProcessMessageError::BadFormat, + } + .into(), + ); + assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight()); + assert_last_event::( + Event::ProcessingFailed { + hash: ::Hashing::hash(b"corrupt"), + origin: MessageOrigin::Here, + error: ProcessMessageError::Corrupt, + } + .into(), + ); + assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight()); + assert_last_event::( + Event::ProcessingFailed { + hash: ::Hashing::hash(b"unsupported"), + origin: MessageOrigin::Here, + error: ProcessMessageError::Unsupported, + } + .into(), + ); + // All pages removed. + assert_pages(&[]); + }); +} + +#[test] +fn reap_page_permanent_overweight_works() { + use MessageOrigin::*; + new_test_ext::().execute_with(|| { + // Create 10 pages more than the stale limit. + let n = (MaxStale::get() + 10) as usize; + for _ in 0..n { + MessageQueue::enqueue_message(msg("weight=2"), Here); + } + assert_eq!(Pages::::iter().count(), n); + assert_eq!(QueueChanges::take().len(), n); + // Mark all pages as stale since their message is permanently overweight. + MessageQueue::service_queues(1.into_weight()); + + // Check that we can reap everything below the watermark. + let max_stale = MaxStale::get(); + for i in 0..n as u32 { + let b = BookStateFor::::get(Here); + let stale_pages = n as u32 - i; + let overflow = stale_pages.saturating_sub(max_stale + 1) + 1; + let backlog = (max_stale * max_stale / overflow).max(max_stale); + let watermark = b.begin.saturating_sub(backlog); + + if i >= watermark { + break + } + assert_ok!(MessageQueue::do_reap_page(&Here, i)); + assert_eq!(QueueChanges::take(), vec![(Here, b.message_count - 1, b.size - 8)]); + } + + // Cannot reap any more pages. + for (o, i, _) in Pages::::iter() { + assert_noop!(MessageQueue::do_reap_page(&o, i), Error::::NotReapable); + assert!(QueueChanges::take().is_empty()); + } + }); +} + +#[test] +fn reaping_overweight_fails_properly() { + use MessageOrigin::*; + assert_eq!(MaxStale::get(), 2, "The stale limit is two"); + + new_test_ext::().execute_with(|| { + // page 0 + MessageQueue::enqueue_message(msg("weight=4"), Here); + MessageQueue::enqueue_message(msg("a"), Here); + // page 1 + MessageQueue::enqueue_message(msg("weight=4"), Here); + MessageQueue::enqueue_message(msg("b"), Here); + // page 2 + MessageQueue::enqueue_message(msg("weight=4"), Here); + MessageQueue::enqueue_message(msg("c"), Here); + // page 3 + MessageQueue::enqueue_message(msg("bigbig 1"), Here); + // page 4 + MessageQueue::enqueue_message(msg("bigbig 2"), Here); + // page 5 + MessageQueue::enqueue_message(msg("bigbig 3"), Here); + // Double-check that exactly these pages exist. + assert_pages(&[0, 1, 2, 3, 4, 5]); + + assert_eq!(MessageQueue::service_queues(2.into_weight()), 2.into_weight()); + assert_eq!(MessagesProcessed::take(), vec![(vmsg("a"), Here), (vmsg("b"), Here)]); + // 2 stale now. + + // Nothing reapable yet, because we haven't hit the stale limit. + for (o, i, _) in Pages::::iter() { + assert_noop!(MessageQueue::do_reap_page(&o, i), Error::::NotReapable); + } + assert_pages(&[0, 1, 2, 3, 4, 5]); + + assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight()); + assert_eq!(MessagesProcessed::take(), vec![(vmsg("c"), Here)]); + // 3 stale now: can take something 4 pages in history. + + assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight()); + assert_eq!(MessagesProcessed::take(), vec![(vmsg("bigbig 1"), Here)]); + + // Nothing reapable yet, because we haven't hit the stale limit. + for (o, i, _) in Pages::::iter() { + assert_noop!(MessageQueue::do_reap_page(&o, i), Error::::NotReapable); + } + assert_pages(&[0, 1, 2, 4, 5]); + + assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight()); + assert_eq!(MessagesProcessed::take(), vec![(vmsg("bigbig 2"), Here)]); + assert_pages(&[0, 1, 2, 5]); + + // First is now reapable as it is too far behind the first ready page (5). + assert_ok!(MessageQueue::do_reap_page(&Here, 0)); + // Others not reapable yet, because we haven't hit the stale limit. + for (o, i, _) in Pages::::iter() { + assert_noop!(MessageQueue::do_reap_page(&o, i), Error::::NotReapable); + } + assert_pages(&[1, 2, 5]); + + assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight()); + assert_eq!(MessagesProcessed::take(), vec![(vmsg("bigbig 3"), Here)]); + + assert_noop!(MessageQueue::do_reap_page(&Here, 0), Error::::NoPage); + assert_noop!(MessageQueue::do_reap_page(&Here, 3), Error::::NoPage); + assert_noop!(MessageQueue::do_reap_page(&Here, 4), Error::::NoPage); + // Still not reapable, since the number of stale pages is only 2. + for (o, i, _) in Pages::::iter() { + assert_noop!(MessageQueue::do_reap_page(&o, i), Error::::NotReapable); + } + }); +} + +#[test] +fn service_queue_bails() { + // Not enough weight for `service_queue_base`. + new_test_ext::().execute_with(|| { + set_weight("service_queue_base", 2.into_weight()); + let mut meter = WeightMeter::from_limit(1.into_weight()); + + assert_storage_noop!(MessageQueue::service_queue(0u32.into(), &mut meter, Weight::MAX)); + assert!(meter.consumed.is_zero()); + }); + // Not enough weight for `ready_ring_unknit`. + new_test_ext::().execute_with(|| { + set_weight("ready_ring_unknit", 2.into_weight()); + let mut meter = WeightMeter::from_limit(1.into_weight()); + + assert_storage_noop!(MessageQueue::service_queue(0u32.into(), &mut meter, Weight::MAX)); + assert!(meter.consumed.is_zero()); + }); + // Not enough weight for `service_queue_base` and `ready_ring_unknit`. + new_test_ext::().execute_with(|| { + set_weight("service_queue_base", 2.into_weight()); + set_weight("ready_ring_unknit", 2.into_weight()); + + let mut meter = WeightMeter::from_limit(3.into_weight()); + assert_storage_noop!(MessageQueue::service_queue(0.into(), &mut meter, Weight::MAX)); + assert!(meter.consumed.is_zero()); + }); +} + +#[test] +fn service_page_works() { + use super::integration_test::Test; // Run with larger page size. + use MessageOrigin::*; + use PageExecutionStatus::*; + new_test_ext::().execute_with(|| { + set_weight("service_page_base_completion", 2.into_weight()); + set_weight("service_page_item", 3.into_weight()); + + let (page, mut msgs) = full_page::(); + assert!(msgs >= 10, "pre-condition: need at least 10 msgs per page"); + let mut book = book_for::(&page); + Pages::::insert(Here, 0, page); + + // Call it a few times each with a random weight limit. + let mut rng = rand::rngs::StdRng::seed_from_u64(42); + while msgs > 0 { + let process = rng.gen_range(0..=msgs); + msgs -= process; + + // Enough weight to process `process` messages. + let mut meter = WeightMeter::from_limit(((2 + (3 + 1) * process) as u64).into_weight()); + System::reset_events(); + let (processed, status) = + crate::Pallet::::service_page(&Here, &mut book, &mut meter, Weight::MAX); + assert_eq!(processed as usize, process); + assert_eq!(NumMessagesProcessed::take(), process); + assert_eq!(System::events().len(), process); + if msgs == 0 { + assert_eq!(status, NoMore); + } else { + assert_eq!(status, Bailed); + } + } + assert!(!Pages::::contains_key(Here, 0), "The page got removed"); + }); +} + +// `service_page` does nothing when called with an insufficient weight limit. +#[test] +fn service_page_bails() { + // Not enough weight for `service_page_base_completion`. + new_test_ext::().execute_with(|| { + set_weight("service_page_base_completion", 2.into_weight()); + let mut meter = WeightMeter::from_limit(1.into_weight()); + + let (page, _) = full_page::(); + let mut book = book_for::(&page); + Pages::::insert(MessageOrigin::Here, 0, page); + + assert_storage_noop!(MessageQueue::service_page( + &MessageOrigin::Here, + &mut book, + &mut meter, + Weight::MAX + )); + assert!(meter.consumed.is_zero()); + }); + // Not enough weight for `service_page_base_no_completion`. + new_test_ext::().execute_with(|| { + set_weight("service_page_base_no_completion", 2.into_weight()); + let mut meter = WeightMeter::from_limit(1.into_weight()); + + let (page, _) = full_page::(); + let mut book = book_for::(&page); + Pages::::insert(MessageOrigin::Here, 0, page); + + assert_storage_noop!(MessageQueue::service_page( + &MessageOrigin::Here, + &mut book, + &mut meter, + Weight::MAX + )); + assert!(meter.consumed.is_zero()); + }); +} + +#[test] +fn service_page_item_bails() { + new_test_ext::().execute_with(|| { + let _guard = StorageNoopGuard::default(); + let (mut page, _) = full_page::(); + let mut weight = WeightMeter::from_limit(10.into_weight()); + let overweight_limit = 10.into_weight(); + set_weight("service_page_item", 11.into_weight()); + + assert_eq!( + MessageQueue::service_page_item( + &MessageOrigin::Here, + 0, + &mut book_for::(&page), + &mut page, + &mut weight, + overweight_limit, + ), + ItemExecutionStatus::Bailed + ); + }); +} + +#[test] +fn bump_service_head_works() { + use MessageOrigin::*; + new_test_ext::().execute_with(|| { + // Create a ready ring with three queues. + BookStateFor::::insert(Here, empty_book::()); + knit(&Here); + BookStateFor::::insert(There, empty_book::()); + knit(&There); + BookStateFor::::insert(Everywhere(0), empty_book::()); + knit(&Everywhere(0)); + + // Bump 99 times. + for i in 0..99 { + let current = MessageQueue::bump_service_head(&mut WeightMeter::max_limit()).unwrap(); + assert_eq!(current, [Here, There, Everywhere(0)][i % 3]); + } + + // The ready ring is intact and the service head is still `Here`. + assert_ring(&[Here, There, Everywhere(0)]); + }); +} + +/// `bump_service_head` does nothing when called with an insufficient weight limit. +#[test] +fn bump_service_head_bails() { + new_test_ext::().execute_with(|| { + set_weight("bump_service_head", 2.into_weight()); + setup_bump_service_head::(0.into(), 10.into()); + + let _guard = StorageNoopGuard::default(); + let mut meter = WeightMeter::from_limit(1.into_weight()); + assert!(MessageQueue::bump_service_head(&mut meter).is_none()); + assert_eq!(meter.consumed, 0.into_weight()); + }); +} + +#[test] +fn bump_service_head_trivial_works() { + new_test_ext::().execute_with(|| { + set_weight("bump_service_head", 2.into_weight()); + let mut meter = WeightMeter::max_limit(); + + assert_eq!(MessageQueue::bump_service_head(&mut meter), None, "Cannot bump"); + assert_eq!(meter.consumed, 2.into_weight()); + + setup_bump_service_head::(0.into(), 1.into()); + + assert_eq!(MessageQueue::bump_service_head(&mut meter), Some(0.into())); + assert_eq!(ServiceHead::::get().unwrap(), 1.into(), "Bumped the head"); + assert_eq!(meter.consumed, 4.into_weight()); + + assert_eq!(MessageQueue::bump_service_head(&mut meter), None, "Cannot bump"); + assert_eq!(meter.consumed, 6.into_weight()); + }); +} + +#[test] +fn bump_service_head_no_head_noops() { + use MessageOrigin::*; + new_test_ext::().execute_with(|| { + // Create a ready ring with three queues. + BookStateFor::::insert(Here, empty_book::()); + knit(&Here); + BookStateFor::::insert(There, empty_book::()); + knit(&There); + BookStateFor::::insert(Everywhere(0), empty_book::()); + knit(&Everywhere(0)); + + // But remove the service head. + ServiceHead::::kill(); + + // Nothing happens. + assert_storage_noop!(MessageQueue::bump_service_head(&mut WeightMeter::max_limit())); + }); +} + +#[test] +fn service_page_item_consumes_correct_weight() { + new_test_ext::().execute_with(|| { + let mut page = page::(b"weight=3"); + let mut weight = WeightMeter::from_limit(10.into_weight()); + let overweight_limit = 0.into_weight(); + set_weight("service_page_item", 2.into_weight()); + + assert_eq!( + MessageQueue::service_page_item( + &MessageOrigin::Here, + 0, + &mut book_for::(&page), + &mut page, + &mut weight, + overweight_limit + ), + ItemExecutionStatus::Executed(true) + ); + assert_eq!(weight.consumed, 5.into_weight()); + }); +} + +/// `service_page_item` skips a permanently `Overweight` message and marks it as `unprocessed`. +#[test] +fn service_page_item_skips_perm_overweight_message() { + new_test_ext::().execute_with(|| { + let mut page = page::(b"TooMuch"); + let mut weight = WeightMeter::from_limit(2.into_weight()); + let overweight_limit = 0.into_weight(); + set_weight("service_page_item", 2.into_weight()); + + assert_eq!( + crate::Pallet::::service_page_item( + &MessageOrigin::Here, + 0, + &mut book_for::(&page), + &mut page, + &mut weight, + overweight_limit + ), + ItemExecutionStatus::Executed(false) + ); + assert_eq!(weight.consumed, 2.into_weight()); + assert_last_event::( + Event::OverweightEnqueued { + hash: ::Hashing::hash(b"TooMuch"), + origin: MessageOrigin::Here, + message_index: 0, + page_index: 0, + } + .into(), + ); + + // Check that the message was skipped. + let (pos, processed, payload) = page.peek_index(0).unwrap(); + assert_eq!(pos, 0); + assert!(!processed); + assert_eq!(payload, b"TooMuch".encode()); + }); +} + +#[test] +fn peek_index_works() { + use super::integration_test::Test; // Run with larger page size. + new_test_ext::().execute_with(|| { + // Fill a page with messages. + let (mut page, msgs) = full_page::(); + let msg_enc_len = ItemHeader::<::Size>::max_encoded_len() + 4; + + for i in 0..msgs { + // Skip all even messages. + page.skip_first(i % 2 == 0); + // Peek each message and check that it is correct. + let (pos, processed, payload) = page.peek_index(i).unwrap(); + assert_eq!(pos, msg_enc_len * i); + assert_eq!(processed, i % 2 == 0); + // `full_page` uses the index as payload. + assert_eq!(payload, (i as u32).encode()); + } + }); +} + +#[test] +fn peek_first_and_skip_first_works() { + use super::integration_test::Test; // Run with larger page size. + new_test_ext::().execute_with(|| { + // Fill a page with messages. + let (mut page, msgs) = full_page::(); + + for i in 0..msgs { + let msg = page.peek_first().unwrap(); + // `full_page` uses the index as payload. + assert_eq!(msg.deref(), (i as u32).encode()); + page.skip_first(i % 2 == 0); // True of False should not matter here. + } + assert!(page.peek_first().is_none(), "Page must be at the end"); + + // Check that all messages were correctly marked as (un)processed. + for i in 0..msgs { + let (_, processed, _) = page.peek_index(i).unwrap(); + assert_eq!(processed, i % 2 == 0); + } + }); +} + +#[test] +fn note_processed_at_pos_works() { + use super::integration_test::Test; // Run with larger page size. + new_test_ext::().execute_with(|| { + let (mut page, msgs) = full_page::(); + + for i in 0..msgs { + let (pos, processed, _) = page.peek_index(i).unwrap(); + assert!(!processed); + assert_eq!(page.remaining as usize, msgs - i); + + page.note_processed_at_pos(pos); + + let (_, processed, _) = page.peek_index(i).unwrap(); + assert!(processed); + assert_eq!(page.remaining as usize, msgs - i - 1); + } + // `skip_first` still works fine. + for _ in 0..msgs { + page.peek_first().unwrap(); + page.skip_first(false); + } + assert!(page.peek_first().is_none()); + }); +} + +#[test] +fn note_processed_at_pos_idempotent() { + let (mut page, _) = full_page::(); + page.note_processed_at_pos(0); + + let original = page.clone(); + page.note_processed_at_pos(0); + assert_eq!(page.heap, original.heap); +} + +#[test] +fn is_complete_works() { + use super::integration_test::Test; // Run with larger page size. + new_test_ext::().execute_with(|| { + let (mut page, msgs) = full_page::(); + assert!(msgs > 3, "Boring"); + let msg_enc_len = ItemHeader::<::Size>::max_encoded_len() + 4; + + assert!(!page.is_complete()); + for i in 0..msgs { + if i % 2 == 0 { + page.skip_first(false); + } else { + page.note_processed_at_pos(msg_enc_len * i); + } + } + // Not complete since `skip_first` was called with `false`. + assert!(!page.is_complete()); + for i in 0..msgs { + if i % 2 == 0 { + assert!(!page.is_complete()); + let (pos, _, _) = page.peek_index(i).unwrap(); + page.note_processed_at_pos(pos); + } + } + assert!(page.is_complete()); + assert_eq!(page.remaining_size, 0); + // Each message is marked as processed. + for i in 0..msgs { + let (_, processed, _) = page.peek_index(i).unwrap(); + assert!(processed); + } + }); +} + +#[test] +fn page_from_message_basic_works() { + assert!(MaxMessageLenOf::::get() > 0, "pre-condition unmet"); + let mut msg: BoundedVec> = Default::default(); + msg.bounded_resize(MaxMessageLenOf::::get() as usize, 123); + + let page = PageOf::::from_message::(msg.as_bounded_slice()); + assert_eq!(page.remaining, 1); + assert_eq!(page.remaining_size as usize, msg.len()); + assert!(page.first_index == 0 && page.first == 0 && page.last == 0); + + // Verify the content of the heap. + let mut heap = Vec::::new(); + let header = + ItemHeader::<::Size> { payload_len: msg.len() as u32, is_processed: false }; + heap.extend(header.encode()); + heap.extend(msg.deref()); + assert_eq!(page.heap, heap); +} + +#[test] +fn page_try_append_message_basic_works() { + use super::integration_test::Test; // Run with larger page size. + + let mut page = PageOf::::default(); + let mut msgs = 0; + // Append as many 4-byte message as possible. + for i in 0..u32::MAX { + let r = i.using_encoded(|i| page.try_append_message::(i.try_into().unwrap())); + if r.is_err() { + break + } else { + msgs += 1; + } + } + let expected_msgs = (::HeapSize::get()) / + (ItemHeader::<::Size>::max_encoded_len() as u32 + 4); + assert_eq!(expected_msgs, msgs, "Wrong number of messages"); + assert_eq!(page.remaining, msgs); + assert_eq!(page.remaining_size, msgs * 4); + + // Verify that the heap content is correct. + let mut heap = Vec::::new(); + for i in 0..msgs { + let header = ItemHeader::<::Size> { payload_len: 4, is_processed: false }; + heap.extend(header.encode()); + heap.extend(i.encode()); + } + assert_eq!(page.heap, heap); +} + +#[test] +fn page_try_append_message_max_msg_len_works_works() { + use super::integration_test::Test; // Run with larger page size. + + // We start off with an empty page. + let mut page = PageOf::::default(); + // … and append a message with maximum possible length. + let msg = vec![123u8; MaxMessageLenOf::::get() as usize]; + // … which works. + page.try_append_message::(BoundedSlice::defensive_truncate_from(&msg)) + .unwrap(); + // Now we cannot append *anything* since the heap is full. + page.try_append_message::(BoundedSlice::defensive_truncate_from(&[])) + .unwrap_err(); + assert_eq!(page.heap.len(), ::HeapSize::get() as usize); +} + +#[test] +fn page_try_append_message_with_remaining_size_works_works() { + use super::integration_test::Test; // Run with larger page size. + let header_size = ItemHeader::<::Size>::max_encoded_len(); + + // We start off with an empty page. + let mut page = PageOf::::default(); + let mut remaining = ::HeapSize::get() as usize; + let mut msgs = Vec::new(); + let mut rng = StdRng::seed_from_u64(42); + // Now we keep appending messages with different lengths. + while remaining >= header_size { + let take = rng.gen_range(0..=(remaining - header_size)); + let msg = vec![123u8; take]; + page.try_append_message::(BoundedSlice::defensive_truncate_from(&msg)) + .unwrap(); + remaining -= take + header_size; + msgs.push(msg); + } + // Cannot even fit a single header in there now. + assert!(remaining < header_size); + assert_eq!(::HeapSize::get() as usize - page.heap.len(), remaining); + assert_eq!(page.remaining as usize, msgs.len()); + assert_eq!( + page.remaining_size as usize, + msgs.iter().fold(0, |mut a, m| { + a += m.len(); + a + }) + ); + // Verify the heap content. + let mut heap = Vec::new(); + for msg in msgs.into_iter() { + let header = ItemHeader::<::Size> { + payload_len: msg.len() as u32, + is_processed: false, + }; + heap.extend(header.encode()); + heap.extend(msg); + } + assert_eq!(page.heap, heap); +} + +// `Page::from_message` does not panic when called with the maximum message and origin lengths. +#[test] +fn page_from_message_max_len_works() { + let max_msg_len: usize = MaxMessageLenOf::::get() as usize; + + let page = PageOf::::from_message::(vec![1; max_msg_len][..].try_into().unwrap()); + + assert_eq!(page.remaining, 1); +} + +#[test] +fn sweep_queue_works() { + use MessageOrigin::*; + new_test_ext::().execute_with(|| { + build_triple_ring(); + + let book = BookStateFor::::get(Here); + assert!(book.begin != book.end); + // Removing the service head works + assert_eq!(ServiceHead::::get(), Some(Here)); + MessageQueue::sweep_queue(Here); + assert_ring(&[There, Everywhere(0)]); + // The book still exits, but has updated begin and end. + let book = BookStateFor::::get(Here); + assert_eq!(book.begin, book.end); + + // Removing something that is not the service head works. + assert!(ServiceHead::::get() != Some(Everywhere(0))); + MessageQueue::sweep_queue(Everywhere(0)); + assert_ring(&[There]); + // The book still exits, but has updated begin and end. + let book = BookStateFor::::get(Everywhere(0)); + assert_eq!(book.begin, book.end); + + MessageQueue::sweep_queue(There); + // The book still exits, but has updated begin and end. + let book = BookStateFor::::get(There); + assert_eq!(book.begin, book.end); + assert_ring(&[]); + + // Sweeping a queue never calls OnQueueChanged. + assert!(QueueChanges::take().is_empty()); + }) +} + +/// Test that `sweep_queue` also works if the ReadyRing wraps around. +#[test] +fn sweep_queue_wraps_works() { + use MessageOrigin::*; + new_test_ext::().execute_with(|| { + BookStateFor::::insert(Here, empty_book::()); + knit(&Here); + + MessageQueue::sweep_queue(Here); + let book = BookStateFor::::get(Here); + assert!(book.ready_neighbours.is_none()); + }); +} + +#[test] +fn sweep_queue_invalid_noops() { + use MessageOrigin::*; + new_test_ext::().execute_with(|| { + assert_storage_noop!(MessageQueue::sweep_queue(Here)); + }); +} + +#[test] +fn footprint_works() { + new_test_ext::().execute_with(|| { + let origin = MessageOrigin::Here; + let (page, msgs) = full_page::(); + let book = book_for::(&page); + BookStateFor::::insert(origin, book); + + let info = MessageQueue::footprint(origin); + assert_eq!(info.count as usize, msgs); + assert_eq!(info.size, page.remaining_size as u64); + + // Sweeping a queue never calls OnQueueChanged. + assert!(QueueChanges::take().is_empty()); + }) +} + +/// The footprint of an invalid queue is the default footprint. +#[test] +fn footprint_invalid_works() { + new_test_ext::().execute_with(|| { + let origin = MessageOrigin::Here; + assert_eq!(MessageQueue::footprint(origin), Default::default()); + }) +} + +/// The footprint of a swept queue is still correct. +#[test] +fn footprint_on_swept_works() { + use MessageOrigin::*; + new_test_ext::().execute_with(|| { + let mut book = empty_book::(); + book.message_count = 3; + book.size = 10; + BookStateFor::::insert(Here, &book); + knit(&Here); + + MessageQueue::sweep_queue(Here); + let fp = MessageQueue::footprint(Here); + assert_eq!(fp.count, 3); + assert_eq!(fp.size, 10); + }) +} + +#[test] +fn execute_overweight_works() { + new_test_ext::().execute_with(|| { + set_weight("bump_service_head", 1.into_weight()); + set_weight("service_queue_base", 1.into_weight()); + set_weight("service_page_base_completion", 1.into_weight()); + + // Enqueue a message + let origin = MessageOrigin::Here; + MessageQueue::enqueue_message(msg("weight=6"), origin); + // Load the current book + let book = BookStateFor::::get(origin); + assert_eq!(book.message_count, 1); + assert!(Pages::::contains_key(origin, 0)); + + // Mark the message as permanently overweight. + assert_eq!(MessageQueue::service_queues(4.into_weight()), 4.into_weight()); + assert_eq!(QueueChanges::take(), vec![(origin, 1, 8)]); + assert_last_event::( + Event::OverweightEnqueued { + hash: ::Hashing::hash(b"weight=6"), + origin: MessageOrigin::Here, + message_index: 0, + page_index: 0, + } + .into(), + ); + + // Now try to execute it with too few weight. + let consumed = + ::execute_overweight(5.into_weight(), (origin, 0, 0)); + assert_eq!(consumed, Err(ExecuteOverweightError::InsufficientWeight)); + + // Execute it with enough weight. + assert_eq!(Pages::::iter().count(), 1); + assert!(QueueChanges::take().is_empty()); + let consumed = + ::execute_overweight(7.into_weight(), (origin, 0, 0)) + .unwrap(); + assert_eq!(consumed, 6.into_weight()); + assert_eq!(QueueChanges::take(), vec![(origin, 0, 0)]); + // There is no message left in the book. + let book = BookStateFor::::get(origin); + assert_eq!(book.message_count, 0); + // And no more pages. + assert_eq!(Pages::::iter().count(), 0); + + // Doing it again with enough weight will error. + let consumed = + ::execute_overweight(70.into_weight(), (origin, 0, 0)); + assert_eq!(consumed, Err(ExecuteOverweightError::NotFound)); + assert!(QueueChanges::take().is_empty()); + assert!(!Pages::::contains_key(origin, 0), "Page is gone"); + }); +} + +/// Checks that (un)knitting the ready ring works with just one queue. +/// +/// This case is interesting since it wraps and a lot of `mutate` now operate on the same object. +#[test] +fn ready_ring_knit_basic_works() { + use MessageOrigin::*; + + new_test_ext::().execute_with(|| { + BookStateFor::::insert(Here, empty_book::()); + + for i in 0..10 { + if i % 2 == 0 { + knit(&Here); + assert_ring(&[Here]); + } else { + unknit(&Here); + assert_ring(&[]); + } + } + assert_ring(&[]); + }); +} + +#[test] +fn ready_ring_knit_and_unknit_works() { + use MessageOrigin::*; + + new_test_ext::().execute_with(|| { + // Place three queues into the storage. + BookStateFor::::insert(Here, empty_book::()); + BookStateFor::::insert(There, empty_book::()); + BookStateFor::::insert(Everywhere(0), empty_book::()); + + // Knit them into the ready ring. + assert_ring(&[]); + knit(&Here); + assert_ring(&[Here]); + knit(&There); + assert_ring(&[Here, There]); + knit(&Everywhere(0)); + assert_ring(&[Here, There, Everywhere(0)]); + + // Now unknit… + unknit(&Here); + assert_ring(&[There, Everywhere(0)]); + unknit(&There); + assert_ring(&[Everywhere(0)]); + unknit(&Everywhere(0)); + assert_ring(&[]); + }); +} + +#[test] +fn enqueue_message_works() { + use MessageOrigin::*; + let max_msg_per_page = ::HeapSize::get() as u64 / + (ItemHeader::<::Size>::max_encoded_len() as u64 + 1); + + new_test_ext::().execute_with(|| { + // Enqueue messages which should fill three pages. + let n = max_msg_per_page * 3; + for i in 1..=n { + MessageQueue::enqueue_message(msg("a"), Here); + assert_eq!(QueueChanges::take(), vec![(Here, i, i)], "OnQueueChanged not called"); + } + assert_eq!(Pages::::iter().count(), 3); + + // Enqueue one more onto page 4. + MessageQueue::enqueue_message(msg("abc"), Here); + assert_eq!(QueueChanges::take(), vec![(Here, n + 1, n + 3)]); + assert_eq!(Pages::::iter().count(), 4); + + // Check the state. + assert_eq!(BookStateFor::::iter().count(), 1); + let book = BookStateFor::::get(Here); + assert_eq!(book.message_count, n + 1); + assert_eq!(book.size, n + 3); + assert_eq!((book.begin, book.end), (0, 4)); + assert_eq!(book.count as usize, Pages::::iter().count()); + }); +} + +#[test] +fn enqueue_messages_works() { + use MessageOrigin::*; + let max_msg_per_page = ::HeapSize::get() as u64 / + (ItemHeader::<::Size>::max_encoded_len() as u64 + 1); + + new_test_ext::().execute_with(|| { + // Enqueue messages which should fill three pages. + let n = max_msg_per_page * 3; + let msgs = vec![msg("a"); n as usize]; + + // Now queue all messages at once. + MessageQueue::enqueue_messages(msgs.into_iter(), Here); + // The changed handler should only be called once. + assert_eq!(QueueChanges::take(), vec![(Here, n, n)], "OnQueueChanged not called"); + assert_eq!(Pages::::iter().count(), 3); + + // Enqueue one more onto page 4. + MessageQueue::enqueue_message(msg("abc"), Here); + assert_eq!(QueueChanges::take(), vec![(Here, n + 1, n + 3)]); + assert_eq!(Pages::::iter().count(), 4); + + // Check the state. + assert_eq!(BookStateFor::::iter().count(), 1); + let book = BookStateFor::::get(Here); + assert_eq!(book.message_count, n + 1); + assert_eq!(book.size, n + 3); + assert_eq!((book.begin, book.end), (0, 4)); + assert_eq!(book.count as usize, Pages::::iter().count()); + }); +} diff --git a/frame/message-queue/src/weights.rs b/frame/message-queue/src/weights.rs new file mode 100644 index 0000000000000..cd9268ffde224 --- /dev/null +++ b/frame/message-queue/src/weights.rs @@ -0,0 +1,216 @@ +// This file is part of Substrate. + +// Copyright (C) 2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: Apache-2.0 + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Autogenerated weights for pallet_message_queue +//! +//! THIS FILE WAS AUTO-GENERATED USING THE SUBSTRATE BENCHMARK CLI VERSION 4.0.0-dev +//! DATE: 2022-12-08, STEPS: `50`, REPEAT: 20, LOW RANGE: `[]`, HIGH RANGE: `[]` +//! HOSTNAME: `bm3`, CPU: `Intel(R) Core(TM) i7-7700K CPU @ 4.20GHz` +//! EXECUTION: Some(Wasm), WASM-EXECUTION: Compiled, CHAIN: Some("dev"), DB CACHE: 1024 + +// Executed Command: +// /home/benchbot/cargo_target_dir/production/substrate +// benchmark +// pallet +// --steps=50 +// --repeat=20 +// --extrinsic=* +// --execution=wasm +// --wasm-execution=compiled +// --heap-pages=4096 +// --json-file=/var/lib/gitlab-runner/builds/zyw4fam_/0/parity/mirrors/substrate/.git/.artifacts/bench.json +// --pallet=pallet_message_queue +// --chain=dev +// --header=./HEADER-APACHE2 +// --output=./frame/message-queue/src/weights.rs +// --template=./.maintain/frame-weight-template.hbs + +#![cfg_attr(rustfmt, rustfmt_skip)] +#![allow(unused_parens)] +#![allow(unused_imports)] + +use frame_support::{traits::Get, weights::{Weight, constants::RocksDbWeight}}; +use sp_std::marker::PhantomData; + +/// Weight functions needed for pallet_message_queue. +pub trait WeightInfo { + fn ready_ring_knit() -> Weight; + fn ready_ring_unknit() -> Weight; + fn service_queue_base() -> Weight; + fn service_page_base_completion() -> Weight; + fn service_page_base_no_completion() -> Weight; + fn service_page_item() -> Weight; + fn bump_service_head() -> Weight; + fn reap_page() -> Weight; + fn execute_overweight_page_removed() -> Weight; + fn execute_overweight_page_updated() -> Weight; +} + +/// Weights for pallet_message_queue using the Substrate node and recommended hardware. +pub struct SubstrateWeight(PhantomData); +impl WeightInfo for SubstrateWeight { + // Storage: MessageQueue ServiceHead (r:1 w:0) + // Storage: MessageQueue BookStateFor (r:2 w:2) + fn ready_ring_knit() -> Weight { + // Minimum execution time: 12_330 nanoseconds. + Weight::from_ref_time(12_711_000) + .saturating_add(T::DbWeight::get().reads(3)) + .saturating_add(T::DbWeight::get().writes(2)) + } + // Storage: MessageQueue BookStateFor (r:2 w:2) + // Storage: MessageQueue ServiceHead (r:1 w:1) + fn ready_ring_unknit() -> Weight { + // Minimum execution time: 12_322 nanoseconds. + Weight::from_ref_time(12_560_000) + .saturating_add(T::DbWeight::get().reads(3)) + .saturating_add(T::DbWeight::get().writes(3)) + } + // Storage: MessageQueue BookStateFor (r:1 w:1) + fn service_queue_base() -> Weight { + // Minimum execution time: 4_652 nanoseconds. + Weight::from_ref_time(4_848_000) + .saturating_add(T::DbWeight::get().reads(1)) + .saturating_add(T::DbWeight::get().writes(1)) + } + // Storage: MessageQueue Pages (r:1 w:1) + fn service_page_base_completion() -> Weight { + // Minimum execution time: 7_115 nanoseconds. + Weight::from_ref_time(7_407_000) + .saturating_add(T::DbWeight::get().reads(1)) + .saturating_add(T::DbWeight::get().writes(1)) + } + // Storage: MessageQueue Pages (r:1 w:1) + fn service_page_base_no_completion() -> Weight { + // Minimum execution time: 6_974 nanoseconds. + Weight::from_ref_time(7_200_000) + .saturating_add(T::DbWeight::get().reads(1)) + .saturating_add(T::DbWeight::get().writes(1)) + } + fn service_page_item() -> Weight { + // Minimum execution time: 79_657 nanoseconds. + Weight::from_ref_time(80_050_000) + } + // Storage: MessageQueue ServiceHead (r:1 w:1) + // Storage: MessageQueue BookStateFor (r:1 w:0) + fn bump_service_head() -> Weight { + // Minimum execution time: 7_598 nanoseconds. + Weight::from_ref_time(8_118_000) + .saturating_add(T::DbWeight::get().reads(2)) + .saturating_add(T::DbWeight::get().writes(1)) + } + // Storage: MessageQueue BookStateFor (r:1 w:1) + // Storage: MessageQueue Pages (r:1 w:1) + fn reap_page() -> Weight { + // Minimum execution time: 60_562 nanoseconds. + Weight::from_ref_time(61_430_000) + .saturating_add(T::DbWeight::get().reads(2)) + .saturating_add(T::DbWeight::get().writes(2)) + } + // Storage: MessageQueue BookStateFor (r:1 w:1) + // Storage: MessageQueue Pages (r:1 w:1) + fn execute_overweight_page_removed() -> Weight { + // Minimum execution time: 74_582 nanoseconds. + Weight::from_ref_time(75_445_000) + .saturating_add(T::DbWeight::get().reads(2)) + .saturating_add(T::DbWeight::get().writes(2)) + } + // Storage: MessageQueue BookStateFor (r:1 w:1) + // Storage: MessageQueue Pages (r:1 w:1) + fn execute_overweight_page_updated() -> Weight { + // Minimum execution time: 87_526 nanoseconds. + Weight::from_ref_time(88_055_000) + .saturating_add(T::DbWeight::get().reads(2)) + .saturating_add(T::DbWeight::get().writes(2)) + } +} + +// For backwards compatibility and tests +impl WeightInfo for () { + // Storage: MessageQueue ServiceHead (r:1 w:0) + // Storage: MessageQueue BookStateFor (r:2 w:2) + fn ready_ring_knit() -> Weight { + // Minimum execution time: 12_330 nanoseconds. + Weight::from_ref_time(12_711_000) + .saturating_add(RocksDbWeight::get().reads(3)) + .saturating_add(RocksDbWeight::get().writes(2)) + } + // Storage: MessageQueue BookStateFor (r:2 w:2) + // Storage: MessageQueue ServiceHead (r:1 w:1) + fn ready_ring_unknit() -> Weight { + // Minimum execution time: 12_322 nanoseconds. + Weight::from_ref_time(12_560_000) + .saturating_add(RocksDbWeight::get().reads(3)) + .saturating_add(RocksDbWeight::get().writes(3)) + } + // Storage: MessageQueue BookStateFor (r:1 w:1) + fn service_queue_base() -> Weight { + // Minimum execution time: 4_652 nanoseconds. + Weight::from_ref_time(4_848_000) + .saturating_add(RocksDbWeight::get().reads(1)) + .saturating_add(RocksDbWeight::get().writes(1)) + } + // Storage: MessageQueue Pages (r:1 w:1) + fn service_page_base_completion() -> Weight { + // Minimum execution time: 7_115 nanoseconds. + Weight::from_ref_time(7_407_000) + .saturating_add(RocksDbWeight::get().reads(1)) + .saturating_add(RocksDbWeight::get().writes(1)) + } + // Storage: MessageQueue Pages (r:1 w:1) + fn service_page_base_no_completion() -> Weight { + // Minimum execution time: 6_974 nanoseconds. + Weight::from_ref_time(7_200_000) + .saturating_add(RocksDbWeight::get().reads(1)) + .saturating_add(RocksDbWeight::get().writes(1)) + } + fn service_page_item() -> Weight { + // Minimum execution time: 79_657 nanoseconds. + Weight::from_ref_time(80_050_000) + } + // Storage: MessageQueue ServiceHead (r:1 w:1) + // Storage: MessageQueue BookStateFor (r:1 w:0) + fn bump_service_head() -> Weight { + // Minimum execution time: 7_598 nanoseconds. + Weight::from_ref_time(8_118_000) + .saturating_add(RocksDbWeight::get().reads(2)) + .saturating_add(RocksDbWeight::get().writes(1)) + } + // Storage: MessageQueue BookStateFor (r:1 w:1) + // Storage: MessageQueue Pages (r:1 w:1) + fn reap_page() -> Weight { + // Minimum execution time: 60_562 nanoseconds. + Weight::from_ref_time(61_430_000) + .saturating_add(RocksDbWeight::get().reads(2)) + .saturating_add(RocksDbWeight::get().writes(2)) + } + // Storage: MessageQueue BookStateFor (r:1 w:1) + // Storage: MessageQueue Pages (r:1 w:1) + fn execute_overweight_page_removed() -> Weight { + // Minimum execution time: 74_582 nanoseconds. + Weight::from_ref_time(75_445_000) + .saturating_add(RocksDbWeight::get().reads(2)) + .saturating_add(RocksDbWeight::get().writes(2)) + } + // Storage: MessageQueue BookStateFor (r:1 w:1) + // Storage: MessageQueue Pages (r:1 w:1) + fn execute_overweight_page_updated() -> Weight { + // Minimum execution time: 87_526 nanoseconds. + Weight::from_ref_time(88_055_000) + .saturating_add(RocksDbWeight::get().reads(2)) + .saturating_add(RocksDbWeight::get().writes(2)) + } +} diff --git a/frame/scheduler/Cargo.toml b/frame/scheduler/Cargo.toml index 86ca63c753bea..25ac602681cc0 100644 --- a/frame/scheduler/Cargo.toml +++ b/frame/scheduler/Cargo.toml @@ -19,6 +19,7 @@ frame-system = { version = "4.0.0-dev", default-features = false, path = "../sys sp-io = { version = "7.0.0", default-features = false, path = "../../primitives/io" } sp-runtime = { version = "7.0.0", default-features = false, path = "../../primitives/runtime" } sp-std = { version = "5.0.0", default-features = false, path = "../../primitives/std" } +sp-weights = { version = "4.0.0", default-features = false, path = "../../primitives/weights" } [dev-dependencies] pallet-preimage = { version = "4.0.0-dev", path = "../preimage" } @@ -42,5 +43,6 @@ std = [ "sp-io/std", "sp-runtime/std", "sp-std/std", + "sp-weights/std", ] try-runtime = ["frame-support/try-runtime"] diff --git a/frame/scheduler/src/lib.rs b/frame/scheduler/src/lib.rs index 78533540be98f..2e0d0c6be1db5 100644 --- a/frame/scheduler/src/lib.rs +++ b/frame/scheduler/src/lib.rs @@ -73,7 +73,6 @@ use frame_support::{ weights::{Weight, WeightMeter}, }; use frame_system::{self as system}; -pub use pallet::*; use scale_info::TypeInfo; use sp_io::hashing::blake2_256; use sp_runtime::{ @@ -81,6 +80,8 @@ use sp_runtime::{ BoundedVec, RuntimeDebug, }; use sp_std::{borrow::Borrow, cmp::Ordering, marker::PhantomData, prelude::*}; + +pub use pallet::*; pub use weights::WeightInfo; /// Just a simple index for naming period tasks. diff --git a/frame/support/src/traits.rs b/frame/support/src/traits.rs index e5ba98fe0c5bb..63c86c1f68459 100644 --- a/frame/support/src/traits.rs +++ b/frame/support/src/traits.rs @@ -112,6 +112,12 @@ pub use voting::{ mod preimages; pub use preimages::{Bounded, BoundedInline, FetchResult, Hash, QueryPreimage, StorePreimage}; +mod messages; +pub use messages::{ + EnqueueMessage, ExecuteOverweightError, Footprint, ProcessMessage, ProcessMessageError, + ServiceQueues, +}; + #[cfg(feature = "try-runtime")] mod try_runtime; #[cfg(feature = "try-runtime")] diff --git a/frame/support/src/traits/messages.rs b/frame/support/src/traits/messages.rs new file mode 100644 index 0000000000000..9b86c421ad9e0 --- /dev/null +++ b/frame/support/src/traits/messages.rs @@ -0,0 +1,202 @@ +// This file is part of Substrate. + +// Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: Apache-2.0 + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Traits for managing message queuing and handling. + +use codec::{Decode, Encode, FullCodec, MaxEncodedLen}; +use scale_info::TypeInfo; +use sp_core::{ConstU32, Get, TypedGet}; +use sp_runtime::{traits::Convert, BoundedSlice, RuntimeDebug}; +use sp_std::{fmt::Debug, marker::PhantomData, prelude::*}; +use sp_weights::Weight; + +/// Errors that can happen when attempting to process a message with +/// [`ProcessMessage::process_message()`]. +#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, TypeInfo, RuntimeDebug)] +pub enum ProcessMessageError { + /// The message data format is unknown (e.g. unrecognised header) + BadFormat, + /// The message data is bad (e.g. decoding returns an error). + Corrupt, + /// The message format is unsupported (e.g. old XCM version). + Unsupported, + /// Message processing was not attempted because it was not certain that the weight limit + /// would be respected. The parameter gives the maximum weight which the message could take + /// to process. + Overweight(Weight), +} + +/// Can process messages from a specific origin. +pub trait ProcessMessage { + /// The transport from where a message originates. + type Origin: FullCodec + MaxEncodedLen + Clone + Eq + PartialEq + TypeInfo + Debug; + + /// Process the given message, using no more than `weight_limit` in weight to do so. + fn process_message( + message: &[u8], + origin: Self::Origin, + weight_limit: Weight, + ) -> Result<(bool, Weight), ProcessMessageError>; +} + +/// Errors that can happen when attempting to execute an overweight message with +/// [`ServiceQueues::execute_overweight()`]. +#[derive(Eq, PartialEq, RuntimeDebug)] +pub enum ExecuteOverweightError { + /// The referenced message was not found. + NotFound, + /// The available weight was insufficient to execute the message. + InsufficientWeight, +} + +/// Can service queues and execute overweight messages. +pub trait ServiceQueues { + /// Addresses a specific overweight message. + type OverweightMessageAddress; + + /// Service all message queues in some fair manner. + /// + /// - `weight_limit`: The maximum amount of dynamic weight that this call can use. + /// + /// Returns the dynamic weight used by this call; is never greater than `weight_limit`. + fn service_queues(weight_limit: Weight) -> Weight; + + /// Executes a message that could not be executed by [`Self::service_queues()`] because it was + /// temporarily overweight. + fn execute_overweight( + _weight_limit: Weight, + _address: Self::OverweightMessageAddress, + ) -> Result { + Err(ExecuteOverweightError::NotFound) + } +} + +/// The resource footprint of a queue. +#[derive(Default, Copy, Clone, Eq, PartialEq, RuntimeDebug)] +pub struct Footprint { + pub count: u64, + pub size: u64, +} + +/// Can enqueue messages for multiple origins. +pub trait EnqueueMessage { + /// The maximal length any enqueued message may have. + type MaxMessageLen: Get; + + /// Enqueue a single `message` from a specific `origin`. + fn enqueue_message(message: BoundedSlice, origin: Origin); + + /// Enqueue multiple `messages` from a specific `origin`. + fn enqueue_messages<'a>( + messages: impl Iterator>, + origin: Origin, + ); + + /// Any remaining unprocessed messages should happen only lazily, not proactively. + fn sweep_queue(origin: Origin); + + /// Return the state footprint of the given queue. + fn footprint(origin: Origin) -> Footprint; +} + +impl EnqueueMessage for () { + type MaxMessageLen = ConstU32<0>; + fn enqueue_message(_: BoundedSlice, _: Origin) {} + fn enqueue_messages<'a>( + _: impl Iterator>, + _: Origin, + ) { + } + fn sweep_queue(_: Origin) {} + fn footprint(_: Origin) -> Footprint { + Footprint::default() + } +} + +/// Transform the origin of an [`EnqueueMessage`] via `C::convert`. +pub struct TransformOrigin(PhantomData<(E, O, N, C)>); +impl, O: MaxEncodedLen, N: MaxEncodedLen, C: Convert> EnqueueMessage + for TransformOrigin +{ + type MaxMessageLen = E::MaxMessageLen; + + fn enqueue_message(message: BoundedSlice, origin: N) { + E::enqueue_message(message, C::convert(origin)); + } + + fn enqueue_messages<'a>( + messages: impl Iterator>, + origin: N, + ) { + E::enqueue_messages(messages, C::convert(origin)); + } + + fn sweep_queue(origin: N) { + E::sweep_queue(C::convert(origin)); + } + + fn footprint(origin: N) -> Footprint { + E::footprint(C::convert(origin)) + } +} + +/// Handles incoming messages for a single origin. +pub trait HandleMessage { + /// The maximal length any enqueued message may have. + type MaxMessageLen: Get; + + /// Enqueue a single `message` with an implied origin. + fn handle_message(message: BoundedSlice); + + /// Enqueue multiple `messages` from an implied origin. + fn handle_messages<'a>( + messages: impl Iterator>, + ); + + /// Any remaining unprocessed messages should happen only lazily, not proactively. + fn sweep_queue(); + + /// Return the state footprint of the queue. + fn footprint() -> Footprint; +} + +/// Adapter type to transform an [`EnqueueMessage`] with an origin into a [`HandleMessage`] impl. +pub struct EnqueueWithOrigin(PhantomData<(E, O)>); +impl, O: TypedGet> HandleMessage for EnqueueWithOrigin +where + O::Type: MaxEncodedLen, +{ + type MaxMessageLen = E::MaxMessageLen; + + fn handle_message(message: BoundedSlice) { + E::enqueue_message(message, O::get()); + } + + fn handle_messages<'a>( + messages: impl Iterator>, + ) { + E::enqueue_messages(messages, O::get()); + } + + fn sweep_queue() { + E::sweep_queue(O::get()); + } + + fn footprint() -> Footprint { + E::footprint(O::get()) + } +} diff --git a/frame/support/test/tests/pallet_ui/storage_ensure_span_are_ok_on_wrong_gen.stderr b/frame/support/test/tests/pallet_ui/storage_ensure_span_are_ok_on_wrong_gen.stderr index 42ef5a34e4c30..999d8585c221a 100644 --- a/frame/support/test/tests/pallet_ui/storage_ensure_span_are_ok_on_wrong_gen.stderr +++ b/frame/support/test/tests/pallet_ui/storage_ensure_span_are_ok_on_wrong_gen.stderr @@ -28,7 +28,7 @@ error[E0277]: the trait bound `Bar: EncodeLike` is not satisfied <&[(T,)] as EncodeLike>> <&[(T,)] as EncodeLike>> <&[T] as EncodeLike>> - and 278 others + and 279 others = note: required for `Bar` to implement `FullEncode` = note: required for `Bar` to implement `FullCodec` = note: required for `frame_support::pallet_prelude::StorageValue<_GeneratedPrefixForStorageFoo, Bar>` to implement `PartialStorageInfoTrait` @@ -69,7 +69,7 @@ error[E0277]: the trait bound `Bar: TypeInfo` is not satisfied (A, B, C, D) (A, B, C, D, E) (A, B, C, D, E, F) - and 161 others + and 162 others = note: required for `Bar` to implement `StaticTypeInfo` = note: required for `frame_support::pallet_prelude::StorageValue<_GeneratedPrefixForStorageFoo, Bar>` to implement `StorageEntryMetadataBuilder` @@ -103,7 +103,7 @@ error[E0277]: the trait bound `Bar: EncodeLike` is not satisfied <&[(T,)] as EncodeLike>> <&[(T,)] as EncodeLike>> <&[T] as EncodeLike>> - and 278 others + and 279 others = note: required for `Bar` to implement `FullEncode` = note: required for `Bar` to implement `FullCodec` = note: required for `frame_support::pallet_prelude::StorageValue<_GeneratedPrefixForStorageFoo, Bar>` to implement `StorageEntryMetadataBuilder` diff --git a/frame/support/test/tests/pallet_ui/storage_ensure_span_are_ok_on_wrong_gen_unnamed.stderr b/frame/support/test/tests/pallet_ui/storage_ensure_span_are_ok_on_wrong_gen_unnamed.stderr index 461d63ebb0d9c..e2870ffb9e86f 100644 --- a/frame/support/test/tests/pallet_ui/storage_ensure_span_are_ok_on_wrong_gen_unnamed.stderr +++ b/frame/support/test/tests/pallet_ui/storage_ensure_span_are_ok_on_wrong_gen_unnamed.stderr @@ -28,7 +28,7 @@ error[E0277]: the trait bound `Bar: EncodeLike` is not satisfied <&[(T,)] as EncodeLike>> <&[(T,)] as EncodeLike>> <&[T] as EncodeLike>> - and 278 others + and 279 others = note: required for `Bar` to implement `FullEncode` = note: required for `Bar` to implement `FullCodec` = note: required for `frame_support::pallet_prelude::StorageValue<_GeneratedPrefixForStorageFoo, Bar>` to implement `PartialStorageInfoTrait` @@ -69,7 +69,7 @@ error[E0277]: the trait bound `Bar: TypeInfo` is not satisfied (A, B, C, D) (A, B, C, D, E) (A, B, C, D, E, F) - and 161 others + and 162 others = note: required for `Bar` to implement `StaticTypeInfo` = note: required for `frame_support::pallet_prelude::StorageValue<_GeneratedPrefixForStorageFoo, Bar>` to implement `StorageEntryMetadataBuilder` @@ -103,7 +103,7 @@ error[E0277]: the trait bound `Bar: EncodeLike` is not satisfied <&[(T,)] as EncodeLike>> <&[(T,)] as EncodeLike>> <&[T] as EncodeLike>> - and 278 others + and 279 others = note: required for `Bar` to implement `FullEncode` = note: required for `Bar` to implement `FullCodec` = note: required for `frame_support::pallet_prelude::StorageValue<_GeneratedPrefixForStorageFoo, Bar>` to implement `StorageEntryMetadataBuilder` diff --git a/primitives/core/src/bounded/bounded_vec.rs b/primitives/core/src/bounded/bounded_vec.rs index 2f39f3340ce50..6e1e1c7cfda64 100644 --- a/primitives/core/src/bounded/bounded_vec.rs +++ b/primitives/core/src/bounded/bounded_vec.rs @@ -675,6 +675,13 @@ impl> BoundedVec { } } +impl BoundedVec { + /// Return a [`BoundedSlice`] with the content and bound of [`Self`]. + pub fn as_bounded_slice(&self) -> BoundedSlice { + BoundedSlice(&self.0[..], PhantomData::default()) + } +} + impl Default for BoundedVec { fn default() -> Self { // the bound cannot be below 0, which is satisfied by an empty vector diff --git a/primitives/weights/src/weight_meter.rs b/primitives/weights/src/weight_meter.rs index d03e72968bb09..17c5da1502e9e 100644 --- a/primitives/weights/src/weight_meter.rs +++ b/primitives/weights/src/weight_meter.rs @@ -71,6 +71,12 @@ impl WeightMeter { time.max(pov) } + /// Consume some weight and defensively fail if it is over the limit. Saturate in any case. + pub fn defensive_saturating_accrue(&mut self, w: Weight) { + self.consumed.saturating_accrue(w); + debug_assert!(self.consumed.all_lte(self.limit), "Weight counter overflow"); + } + /// Consume the given weight after checking that it can be consumed. Otherwise do nothing. pub fn check_accrue(&mut self, w: Weight) -> bool { self.consumed.checked_add(&w).map_or(false, |test| {