Skip to content

Commit

Permalink
[FRAME] MQ processor should be transactional (#5198)
Browse files Browse the repository at this point in the history
closes #2441

Polkadot address: 12GyGD3QhT4i2JJpNzvMf96sxxBLWymz4RdGCxRH5Rj5agKW

---------

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>
Co-authored-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>
Co-authored-by: Bastian Köcher <git@kchr.de>
  • Loading branch information
3 people authored Sep 2, 2024
1 parent c8015b2 commit f6eeca9
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 22 deletions.
13 changes: 13 additions & 0 deletions prdoc/pr_5198.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
title: "MQ processor should be transactional"

doc:
- audience: [Runtime User, Runtime Dev]
description: |
Enforce transactional processing on pallet Message Queue Processor.

Storage changes that were done while processing a message will now be rolled back
when the processing returns an error. `Ok(false)` will not revert, only `Err(_)`.

crates:
- name: pallet-message-queue
bump: major
1 change: 1 addition & 0 deletions substrate/frame/message-queue/src/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ fn stress_test_recursive() {
TotalEnqueued::set(TotalEnqueued::get() + enqueued);
Enqueued::set(Enqueued::get() + enqueued);
Called::set(Called::get() + 1);
Ok(())
}));

build_and_execute::<Test>(|| {
Expand Down
26 changes: 24 additions & 2 deletions substrate/frame/message-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ use sp_arithmetic::traits::{BaseArithmetic, Unsigned};
use sp_core::{defer, H256};
use sp_runtime::{
traits::{One, Zero},
SaturatedConversion, Saturating,
SaturatedConversion, Saturating, TransactionOutcome,
};
use sp_weights::WeightMeter;
pub use weights::WeightInfo;
Expand Down Expand Up @@ -1435,6 +1435,8 @@ impl<T: Config> Pallet<T> {
/// The base weight of this function needs to be accounted for by the caller. `weight` is the
/// remaining weight to process the message. `overweight_limit` is the maximum weight that a
/// message can ever consume. Messages above this limit are marked as permanently overweight.
/// This process is also transactional, any form of error that occurs in processing a message
/// causes storage changes to be rolled back.
fn process_message_payload(
origin: MessageOriginOf<T>,
page_index: PageIndex,
Expand All @@ -1447,7 +1449,27 @@ impl<T: Config> Pallet<T> {
use ProcessMessageError::*;
let prev_consumed = meter.consumed();

match T::MessageProcessor::process_message(message, origin.clone(), meter, &mut id) {
let transaction =
storage::with_transaction(|| -> TransactionOutcome<Result<_, DispatchError>> {
let res =
T::MessageProcessor::process_message(message, origin.clone(), meter, &mut id);
match &res {
Ok(_) => TransactionOutcome::Commit(Ok(res)),
Err(_) => TransactionOutcome::Rollback(Ok(res)),
}
});

let transaction = match transaction {
Ok(result) => result,
_ => {
defensive!(
"Error occurred processing message, storage changes will be rolled back"
);
return MessageExecutionStatus::Unprocessable { permanent: true }
},
};

match transaction {
Err(Overweight(w)) if w.any_gt(overweight_limit) => {
// Permanently overweight.
Self::deposit_event(Event::<T>::OverweightEnqueued {
Expand Down
15 changes: 12 additions & 3 deletions substrate/frame/message-queue/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,15 @@ impl ProcessMessage for RecordingMessageProcessor {
if meter.try_consume(required).is_ok() {
if let Some(p) = message.strip_prefix(&b"callback="[..]) {
let s = String::from_utf8(p.to_vec()).expect("Need valid UTF8");
Callback::get()(&origin, s.parse().expect("Expected an u32"));
if let Err(()) = Callback::get()(&origin, s.parse().expect("Expected an u32")) {
return Err(ProcessMessageError::Corrupt)
}

if s.contains("000") {
return Ok(false)
}
}

let mut m = MessagesProcessed::get();
m.push((message.to_vec(), origin));
MessagesProcessed::set(m);
Expand All @@ -197,7 +204,7 @@ impl ProcessMessage for RecordingMessageProcessor {
}

parameter_types! {
pub static Callback: Box<fn (&MessageOrigin, u32)> = Box::new(|_, _| {});
pub static Callback: Box<fn (&MessageOrigin, u32) -> Result<(), ()>> = Box::new(|_, _| { Ok(()) });
pub static IgnoreStackOvError: bool = false;
}

Expand Down Expand Up @@ -252,7 +259,9 @@ impl ProcessMessage for CountingMessageProcessor {
if meter.try_consume(required).is_ok() {
if let Some(p) = message.strip_prefix(&b"callback="[..]) {
let s = String::from_utf8(p.to_vec()).expect("Need valid UTF8");
Callback::get()(&origin, s.parse().expect("Expected an u32"));
if let Err(()) = Callback::get()(&origin, s.parse().expect("Expected an u32")) {
return Err(ProcessMessageError::Corrupt)
}
}
NumMessagesProcessed::set(NumMessagesProcessed::get() + 1);
Ok(true)
Expand Down
93 changes: 76 additions & 17 deletions substrate/frame/message-queue/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1675,6 +1675,7 @@ fn regression_issue_2319() {
build_and_execute::<Test>(|| {
Callback::set(Box::new(|_, _| {
MessageQueue::enqueue_message(mock_helpers::msg("anothermessage"), There);
Ok(())
}));

use MessageOrigin::*;
Expand All @@ -1695,23 +1696,26 @@ fn regression_issue_2319() {
#[test]
fn recursive_enqueue_works() {
build_and_execute::<Test>(|| {
Callback::set(Box::new(|o, i| match i {
0 => {
MessageQueue::enqueue_message(msg(&format!("callback={}", 1)), *o);
},
1 => {
for _ in 0..100 {
MessageQueue::enqueue_message(msg(&format!("callback={}", 2)), *o);
}
for i in 0..100 {
MessageQueue::enqueue_message(msg(&format!("callback={}", 3)), i.into());
}
},
2 | 3 => {
MessageQueue::enqueue_message(msg(&format!("callback={}", 4)), *o);
},
4 => (),
_ => unreachable!(),
Callback::set(Box::new(|o, i| {
match i {
0 => {
MessageQueue::enqueue_message(msg(&format!("callback={}", 1)), *o);
},
1 => {
for _ in 0..100 {
MessageQueue::enqueue_message(msg(&format!("callback={}", 2)), *o);
}
for i in 0..100 {
MessageQueue::enqueue_message(msg(&format!("callback={}", 3)), i.into());
}
},
2 | 3 => {
MessageQueue::enqueue_message(msg(&format!("callback={}", 4)), *o);
},
4 => (),
_ => unreachable!(),
};
Ok(())
}));

MessageQueue::enqueue_message(msg("callback=0"), MessageOrigin::Here);
Expand All @@ -1735,6 +1739,7 @@ fn recursive_service_is_forbidden() {
// This call will fail since it is recursive. But it will not mess up the state.
assert_storage_noop!(MessageQueue::service_queues(10.into_weight()));
MessageQueue::enqueue_message(msg("m2"), There);
Ok(())
}));

for _ in 0..5 {
Expand Down Expand Up @@ -1778,6 +1783,7 @@ fn recursive_overweight_while_service_is_forbidden() {
),
ExecuteOverweightError::RecursiveDisallowed
);
Ok(())
}));

MessageQueue::enqueue_message(msg("weight=10"), There);
Expand All @@ -1800,6 +1806,7 @@ fn recursive_reap_page_is_forbidden() {
Callback::set(Box::new(|_, _| {
// This call will fail since it is recursive. But it will not mess up the state.
assert_noop!(MessageQueue::do_reap_page(&Here, 0), Error::<Test>::RecursiveDisallowed);
Ok(())
}));

// Create 10 pages more than the stale limit.
Expand Down Expand Up @@ -1975,3 +1982,55 @@ fn execute_overweight_keeps_stack_ov_message() {
System::reset_events();
});
}

#[test]
fn process_message_error_reverts_storage_changes() {
build_and_execute::<Test>(|| {
assert!(!sp_io::storage::exists(b"key"), "Key should not exist");

Callback::set(Box::new(|_, _| {
sp_io::storage::set(b"key", b"value");
Err(())
}));

MessageQueue::enqueue_message(msg("callback=0"), MessageOrigin::Here);
MessageQueue::service_queues(10.into_weight());

assert!(!sp_io::storage::exists(b"key"), "Key should have been rolled back");
});
}

#[test]
fn process_message_ok_false_keeps_storage_changes() {
build_and_execute::<Test>(|| {
assert!(!sp_io::storage::exists(b"key"), "Key should not exist");

Callback::set(Box::new(|_, _| {
sp_io::storage::set(b"key", b"value");
Ok(())
}));

// 000 will make it return `Ok(false)`
MessageQueue::enqueue_message(msg("callback=000"), MessageOrigin::Here);
MessageQueue::service_queues(10.into_weight());

assert_eq!(sp_io::storage::exists(b"key"), true);
});
}

#[test]
fn process_message_ok_true_keeps_storage_changes() {
build_and_execute::<Test>(|| {
assert!(!sp_io::storage::exists(b"key"), "Key should not exist");

Callback::set(Box::new(|_, _| {
sp_io::storage::set(b"key", b"value");
Ok(())
}));

MessageQueue::enqueue_message(msg("callback=0"), MessageOrigin::Here);
MessageQueue::service_queues(10.into_weight());

assert_eq!(sp_io::storage::exists(b"key"), true);
});
}

0 comments on commit f6eeca9

Please sign in to comment.