Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FRAME] MQ processor should be transactional #5198

Merged
merged 17 commits into from
Sep 2, 2024
Merged
10 changes: 10 additions & 0 deletions prdoc/pr_5198.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
title: "MQ processor should be transactional"

doc:
- audience: Runtime User
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- audience: Runtime User
- audience: [Runtime User, Runtime Dev]

description: |
Enforce transactional processing on pallet Message Queue Processor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Enforce transactional processing on pallet Message Queue Processor
Enforce transactional processing on pallet Message Queue Processor.
Storage changes that were done while processing a message will now be rolled back
when the processing returns an error. `Ok(false)` will not revert, only `Err(_)`.


crates:
- name: pallet-message-queue
bump: patch
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
bump: patch
bump: major

Its a logical breaking change, better safe than sorry for these things.

1 change: 1 addition & 0 deletions substrate/frame/message-queue/src/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ fn stress_test_recursive() {
TotalEnqueued::set(TotalEnqueued::get() + enqueued);
Enqueued::set(Enqueued::get() + enqueued);
Called::set(Called::get() + 1);
Ok(())
}));

build_and_execute::<Test>(|| {
Expand Down
19 changes: 17 additions & 2 deletions substrate/frame/message-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ use sp_arithmetic::traits::{BaseArithmetic, Unsigned};
use sp_core::{defer, H256};
use sp_runtime::{
traits::{One, Zero},
SaturatedConversion, Saturating,
SaturatedConversion, Saturating, TransactionOutcome,
};
use sp_weights::WeightMeter;
pub use weights::WeightInfo;
Expand Down Expand Up @@ -1447,7 +1447,22 @@ impl<T: Config> Pallet<T> {
use ProcessMessageError::*;
let prev_consumed = meter.consumed();

match T::MessageProcessor::process_message(message, origin.clone(), meter, &mut id) {
let transaction =
storage::with_transaction(|| -> TransactionOutcome<Result<_, DispatchError>> {
let res =
T::MessageProcessor::process_message(message, origin.clone(), meter, &mut id);
match &res {
Ok(_) => TransactionOutcome::Commit(Ok(res)),
Err(_) => TransactionOutcome::Rollback(Ok(res)),
}
});

let transaction = match transaction {
Ok(result) => result,
_ => unreachable!(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With current storage::with_transaction implementation, this is indeed unreachable, but if the internals of that function change, this will crash the runtime.

I know, highly unlikely, but still I would prefer to:

  • debug_assert!() to have it crash in tests if the invariant is ever broken,
  • return some error MessageExecutionStatus in prod without crashing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea it should be defensive and then just return an error or so. Having panics in the runtime is never good.

};

match transaction {
Err(Overweight(w)) if w.any_gt(overweight_limit) => {
// Permanently overweight.
Self::deposit_event(Event::<T>::OverweightEnqueued {
Expand Down
10 changes: 7 additions & 3 deletions substrate/frame/message-queue/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ impl ProcessMessage for RecordingMessageProcessor {
if meter.try_consume(required).is_ok() {
if let Some(p) = message.strip_prefix(&b"callback="[..]) {
let s = String::from_utf8(p.to_vec()).expect("Need valid UTF8");
Callback::get()(&origin, s.parse().expect("Expected an u32"));
if let Err(()) = Callback::get()(&origin, s.parse().expect("Expected an u32")) {
return Err(ProcessMessageError::Corrupt)
}
}
let mut m = MessagesProcessed::get();
m.push((message.to_vec(), origin));
Expand All @@ -197,7 +199,7 @@ impl ProcessMessage for RecordingMessageProcessor {
}

parameter_types! {
pub static Callback: Box<fn (&MessageOrigin, u32)> = Box::new(|_, _| {});
pub static Callback: Box<fn (&MessageOrigin, u32) -> Result<(), ()>> = Box::new(|_, _| { Ok(()) });
pub static IgnoreStackOvError: bool = false;
}

Expand Down Expand Up @@ -252,7 +254,9 @@ impl ProcessMessage for CountingMessageProcessor {
if meter.try_consume(required).is_ok() {
if let Some(p) = message.strip_prefix(&b"callback="[..]) {
let s = String::from_utf8(p.to_vec()).expect("Need valid UTF8");
Callback::get()(&origin, s.parse().expect("Expected an u32"));
if let Err(()) = Callback::get()(&origin, s.parse().expect("Expected an u32")) {
return Err(ProcessMessageError::Corrupt)
}
}
NumMessagesProcessed::set(NumMessagesProcessed::get() + 1);
Ok(true)
Expand Down
68 changes: 51 additions & 17 deletions substrate/frame/message-queue/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1675,6 +1675,7 @@ fn regression_issue_2319() {
build_and_execute::<Test>(|| {
Callback::set(Box::new(|_, _| {
MessageQueue::enqueue_message(mock_helpers::msg("anothermessage"), There);
Ok(())
}));

use MessageOrigin::*;
Expand All @@ -1695,23 +1696,26 @@ fn regression_issue_2319() {
#[test]
fn recursive_enqueue_works() {
build_and_execute::<Test>(|| {
Callback::set(Box::new(|o, i| match i {
0 => {
MessageQueue::enqueue_message(msg(&format!("callback={}", 1)), *o);
},
1 => {
for _ in 0..100 {
MessageQueue::enqueue_message(msg(&format!("callback={}", 2)), *o);
}
for i in 0..100 {
MessageQueue::enqueue_message(msg(&format!("callback={}", 3)), i.into());
}
},
2 | 3 => {
MessageQueue::enqueue_message(msg(&format!("callback={}", 4)), *o);
},
4 => (),
_ => unreachable!(),
Callback::set(Box::new(|o, i| {
match i {
0 => {
MessageQueue::enqueue_message(msg(&format!("callback={}", 1)), *o);
},
1 => {
for _ in 0..100 {
MessageQueue::enqueue_message(msg(&format!("callback={}", 2)), *o);
}
for i in 0..100 {
MessageQueue::enqueue_message(msg(&format!("callback={}", 3)), i.into());
}
},
2 | 3 => {
MessageQueue::enqueue_message(msg(&format!("callback={}", 4)), *o);
},
4 => (),
_ => unreachable!(),
};
Ok(())
}));

MessageQueue::enqueue_message(msg("callback=0"), MessageOrigin::Here);
Expand All @@ -1735,6 +1739,7 @@ fn recursive_service_is_forbidden() {
// This call will fail since it is recursive. But it will not mess up the state.
assert_storage_noop!(MessageQueue::service_queues(10.into_weight()));
MessageQueue::enqueue_message(msg("m2"), There);
Ok(())
}));

for _ in 0..5 {
Expand Down Expand Up @@ -1778,6 +1783,7 @@ fn recursive_overweight_while_service_is_forbidden() {
),
ExecuteOverweightError::RecursiveDisallowed
);
Ok(())
}));

MessageQueue::enqueue_message(msg("weight=10"), There);
Expand All @@ -1800,6 +1806,7 @@ fn recursive_reap_page_is_forbidden() {
Callback::set(Box::new(|_, _| {
// This call will fail since it is recursive. But it will not mess up the state.
assert_noop!(MessageQueue::do_reap_page(&Here, 0), Error::<Test>::RecursiveDisallowed);
Ok(())
}));

// Create 10 pages more than the stale limit.
Expand Down Expand Up @@ -1975,3 +1982,30 @@ fn execute_overweight_keeps_stack_ov_message() {
System::reset_events();
});
}

#[test]
fn process_message_error_reverts_storage_changes() {
build_and_execute::<Test>(|| {
assert!(!sp_io::storage::exists(b"key"), "Key should not exist");

Callback::set(Box::new(|_, _| {
sp_io::storage::set(b"key", b"value");
Err(())
}));

MessageQueue::enqueue_message(msg("callback=0"), MessageOrigin::Here);
MessageQueue::service_queues(10.into_weight());

assert!(!sp_io::storage::exists(b"key"), "Key should have been rolled back");
});
}

#[test]
fn process_message_ok_false_keeps_storage_changes() {
// FAIL-CI TODO
}

#[test]
fn process_message_ok_true_keeps_storage_changes() {
// FAIL-CI TODO
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like these are the only missing pieces to get this in?

Loading