From fdaf250750236eff95f4df6e076e77dc7dc672f4 Mon Sep 17 00:00:00 2001 From: dharjeezy Date: Wed, 31 Jul 2024 12:40:28 +0100 Subject: [PATCH 01/13] make process_message transactional --- Cargo.lock | 1 + substrate/frame/message-queue/Cargo.toml | 2 + substrate/frame/message-queue/src/lib.rs | 99 +++++++++++++----------- 3 files changed, 58 insertions(+), 44 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7466975fa428..ab69ea211bec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10921,6 +10921,7 @@ dependencies = [ "rand_distr", "scale-info", "serde", + "sp-api", "sp-arithmetic", "sp-core", "sp-crypto-hashing", diff --git a/substrate/frame/message-queue/Cargo.toml b/substrate/frame/message-queue/Cargo.toml index a6de61d70abf..11902231ea93 100644 --- a/substrate/frame/message-queue/Cargo.toml +++ b/substrate/frame/message-queue/Cargo.toml @@ -23,6 +23,7 @@ sp-io = { workspace = true } sp-runtime = { workspace = true } sp-arithmetic = { workspace = true } sp-weights = { workspace = true } +sp-api = { workspace = true } frame-benchmarking = { optional = true, workspace = true } frame-support = { workspace = true } @@ -50,6 +51,7 @@ std = [ "sp-runtime/std", "sp-tracing/std", "sp-weights/std", + "sp-api/std", ] runtime-benchmarks = [ "frame-benchmarking/runtime-benchmarks", diff --git a/substrate/frame/message-queue/src/lib.rs b/substrate/frame/message-queue/src/lib.rs index 2dbffef7e5a2..e34469682384 100644 --- a/substrate/frame/message-queue/src/lib.rs +++ b/substrate/frame/message-queue/src/lib.rs @@ -221,12 +221,10 @@ use frame_support::{ use frame_system::pallet_prelude::*; pub use pallet::*; use scale_info::TypeInfo; +use frame_support::traits::ProcessMessageError::{BadFormat, Corrupt, StackLimitReached, Unsupported, Yield}; use sp_arithmetic::traits::{BaseArithmetic, Unsigned}; use sp_core::{defer, H256}; -use sp_runtime::{ - traits::{One, Zero}, - SaturatedConversion, Saturating, -}; +use sp_runtime::{traits::{One, Zero}, SaturatedConversion, Saturating, TransactionOutcome}; use sp_weights::WeightMeter; pub use weights::WeightInfo; @@ -1447,46 +1445,59 @@ impl Pallet { use ProcessMessageError::*; let prev_consumed = meter.consumed(); - match T::MessageProcessor::process_message(message, origin.clone(), meter, &mut id) { - Err(Overweight(w)) if w.any_gt(overweight_limit) => { - // Permanently overweight. - Self::deposit_event(Event::::OverweightEnqueued { - id, - origin, - page_index, - message_index, - }); - MessageExecutionStatus::Overweight - }, - Err(Overweight(_)) => { - // Temporarily overweight - save progress and stop processing this - // queue. - MessageExecutionStatus::InsufficientWeight - }, - Err(Yield) => { - // Processing should be reattempted later. - MessageExecutionStatus::Unprocessable { permanent: false } - }, - Err(error @ BadFormat | error @ Corrupt | error @ Unsupported) => { - // Permanent error - drop - Self::deposit_event(Event::::ProcessingFailed { id: id.into(), origin, error }); - MessageExecutionStatus::Unprocessable { permanent: true } - }, - Err(error @ StackLimitReached) => { - Self::deposit_event(Event::::ProcessingFailed { id: id.into(), origin, error }); - MessageExecutionStatus::StackLimitReached - }, - Ok(success) => { - // Success - let weight_used = meter.consumed().saturating_sub(prev_consumed); - Self::deposit_event(Event::::Processed { - id: id.into(), - origin, - weight_used, - success, - }); - MessageExecutionStatus::Processed - }, + let transaction = storage::with_transaction(|| -> sp_api::TransactionOutcome> { + let res = T::MessageProcessor::process_message(message, origin.clone(), meter, &mut id); + match &res { + Ok(_) => TransactionOutcome::Commit(Ok(res)), + Err(_) => TransactionOutcome::Rollback(Ok(res)), + } + }); + + let transaction = match transaction { + Ok(result) => result, + Err(_) => return MessageExecutionStatus::Unprocessable { permanent: false }, + }; + + match transaction { + Err(Overweight(w)) if w.any_gt(overweight_limit) => { + // Permanently overweight. + Self::deposit_event(Event::::OverweightEnqueued { + id, + origin, + page_index, + message_index, + }); + MessageExecutionStatus::Overweight + }, + Err(Overweight(_)) => { + // Temporarily overweight - save progress and stop processing this + // queue. + MessageExecutionStatus::InsufficientWeight + }, + Err(Yield) => { + // Processing should be reattempted later. + MessageExecutionStatus::Unprocessable { permanent: false } + }, + Err(error @ BadFormat | error @ Corrupt | error @ Unsupported) => { + // Permanent error - drop + Self::deposit_event(Event::::ProcessingFailed { id: id.into(), origin, error }); + MessageExecutionStatus::Unprocessable { permanent: true } + }, + Err(error @ StackLimitReached) => { + Self::deposit_event(Event::::ProcessingFailed { id: id.into(), origin, error }); + MessageExecutionStatus::StackLimitReached + }, + Ok(success) => { + // Success + let weight_used = meter.consumed().saturating_sub(prev_consumed); + Self::deposit_event(Event::::Processed { + id: id.into(), + origin, + weight_used, + success, + }); + MessageExecutionStatus::Processed + }, } } } From f2bffb7adea77c59e6b5ee4f13232cdfa0aaaec0 Mon Sep 17 00:00:00 2001 From: dharjeezy Date: Wed, 31 Jul 2024 12:41:25 +0100 Subject: [PATCH 02/13] fmt --- substrate/frame/message-queue/src/lib.rs | 106 ++++++++++++----------- 1 file changed, 56 insertions(+), 50 deletions(-) diff --git a/substrate/frame/message-queue/src/lib.rs b/substrate/frame/message-queue/src/lib.rs index e34469682384..a94012a7a0a9 100644 --- a/substrate/frame/message-queue/src/lib.rs +++ b/substrate/frame/message-queue/src/lib.rs @@ -213,18 +213,21 @@ use frame_support::{ pallet_prelude::*, traits::{ Defensive, DefensiveSaturating, DefensiveTruncateFrom, EnqueueMessage, - ExecuteOverweightError, Footprint, ProcessMessage, ProcessMessageError, QueueFootprint, - QueuePausedQuery, ServiceQueues, + ExecuteOverweightError, Footprint, ProcessMessage, ProcessMessageError, + ProcessMessageError::{BadFormat, Corrupt, StackLimitReached, Unsupported, Yield}, + QueueFootprint, QueuePausedQuery, ServiceQueues, }, BoundedSlice, CloneNoBound, DefaultNoBound, }; use frame_system::pallet_prelude::*; pub use pallet::*; use scale_info::TypeInfo; -use frame_support::traits::ProcessMessageError::{BadFormat, Corrupt, StackLimitReached, Unsupported, Yield}; use sp_arithmetic::traits::{BaseArithmetic, Unsigned}; use sp_core::{defer, H256}; -use sp_runtime::{traits::{One, Zero}, SaturatedConversion, Saturating, TransactionOutcome}; +use sp_runtime::{ + traits::{One, Zero}, + SaturatedConversion, Saturating, TransactionOutcome, +}; use sp_weights::WeightMeter; pub use weights::WeightInfo; @@ -1445,13 +1448,16 @@ impl Pallet { use ProcessMessageError::*; let prev_consumed = meter.consumed(); - let transaction = storage::with_transaction(|| -> sp_api::TransactionOutcome> { - let res = T::MessageProcessor::process_message(message, origin.clone(), meter, &mut id); - match &res { - Ok(_) => TransactionOutcome::Commit(Ok(res)), - Err(_) => TransactionOutcome::Rollback(Ok(res)), - } - }); + let transaction = storage::with_transaction( + || -> sp_api::TransactionOutcome> { + let res = + T::MessageProcessor::process_message(message, origin.clone(), meter, &mut id); + match &res { + Ok(_) => TransactionOutcome::Commit(Ok(res)), + Err(_) => TransactionOutcome::Rollback(Ok(res)), + } + }, + ); let transaction = match transaction { Ok(result) => result, @@ -1459,45 +1465,45 @@ impl Pallet { }; match transaction { - Err(Overweight(w)) if w.any_gt(overweight_limit) => { - // Permanently overweight. - Self::deposit_event(Event::::OverweightEnqueued { - id, - origin, - page_index, - message_index, - }); - MessageExecutionStatus::Overweight - }, - Err(Overweight(_)) => { - // Temporarily overweight - save progress and stop processing this - // queue. - MessageExecutionStatus::InsufficientWeight - }, - Err(Yield) => { - // Processing should be reattempted later. - MessageExecutionStatus::Unprocessable { permanent: false } - }, - Err(error @ BadFormat | error @ Corrupt | error @ Unsupported) => { - // Permanent error - drop - Self::deposit_event(Event::::ProcessingFailed { id: id.into(), origin, error }); - MessageExecutionStatus::Unprocessable { permanent: true } - }, - Err(error @ StackLimitReached) => { - Self::deposit_event(Event::::ProcessingFailed { id: id.into(), origin, error }); - MessageExecutionStatus::StackLimitReached - }, - Ok(success) => { - // Success - let weight_used = meter.consumed().saturating_sub(prev_consumed); - Self::deposit_event(Event::::Processed { - id: id.into(), - origin, - weight_used, - success, - }); - MessageExecutionStatus::Processed - }, + Err(Overweight(w)) if w.any_gt(overweight_limit) => { + // Permanently overweight. + Self::deposit_event(Event::::OverweightEnqueued { + id, + origin, + page_index, + message_index, + }); + MessageExecutionStatus::Overweight + }, + Err(Overweight(_)) => { + // Temporarily overweight - save progress and stop processing this + // queue. + MessageExecutionStatus::InsufficientWeight + }, + Err(Yield) => { + // Processing should be reattempted later. + MessageExecutionStatus::Unprocessable { permanent: false } + }, + Err(error @ BadFormat | error @ Corrupt | error @ Unsupported) => { + // Permanent error - drop + Self::deposit_event(Event::::ProcessingFailed { id: id.into(), origin, error }); + MessageExecutionStatus::Unprocessable { permanent: true } + }, + Err(error @ StackLimitReached) => { + Self::deposit_event(Event::::ProcessingFailed { id: id.into(), origin, error }); + MessageExecutionStatus::StackLimitReached + }, + Ok(success) => { + // Success + let weight_used = meter.consumed().saturating_sub(prev_consumed); + Self::deposit_event(Event::::Processed { + id: id.into(), + origin, + weight_used, + success, + }); + MessageExecutionStatus::Processed + }, } } } From 825c8e96818f7dd80d64295c070a171f1fc4640d Mon Sep 17 00:00:00 2001 From: dharjeezy Date: Wed, 31 Jul 2024 12:47:49 +0100 Subject: [PATCH 03/13] pr doc --- prdoc/pr_5198.prdoc | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 prdoc/pr_5198.prdoc diff --git a/prdoc/pr_5198.prdoc b/prdoc/pr_5198.prdoc new file mode 100644 index 000000000000..9f7d59e39149 --- /dev/null +++ b/prdoc/pr_5198.prdoc @@ -0,0 +1,10 @@ +title: "MQ processor should be transactional" + +doc: + - audience: Runtime User + description: | + Enforce transactional processing on pallet Message Queue Processor + +crates: + - name: pallet-message-queue + bump: patch \ No newline at end of file From 970253f9788bd63465df9b8687453d0331176dbc Mon Sep 17 00:00:00 2001 From: dharjeezy Date: Wed, 31 Jul 2024 12:52:18 +0100 Subject: [PATCH 04/13] unreachable match case --- substrate/frame/message-queue/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/frame/message-queue/src/lib.rs b/substrate/frame/message-queue/src/lib.rs index a94012a7a0a9..0c40bf4dadb8 100644 --- a/substrate/frame/message-queue/src/lib.rs +++ b/substrate/frame/message-queue/src/lib.rs @@ -1461,7 +1461,7 @@ impl Pallet { let transaction = match transaction { Ok(result) => result, - Err(_) => return MessageExecutionStatus::Unprocessable { permanent: false }, + _ => unreachable!(), }; match transaction { From ca4ae3d11c963f320ded77c20896cdcd36d4c19e Mon Sep 17 00:00:00 2001 From: dharjeezy Date: Wed, 31 Jul 2024 12:55:58 +0100 Subject: [PATCH 05/13] nit --- substrate/frame/message-queue/src/lib.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/substrate/frame/message-queue/src/lib.rs b/substrate/frame/message-queue/src/lib.rs index 0c40bf4dadb8..fa53d1ac695b 100644 --- a/substrate/frame/message-queue/src/lib.rs +++ b/substrate/frame/message-queue/src/lib.rs @@ -213,9 +213,8 @@ use frame_support::{ pallet_prelude::*, traits::{ Defensive, DefensiveSaturating, DefensiveTruncateFrom, EnqueueMessage, - ExecuteOverweightError, Footprint, ProcessMessage, ProcessMessageError, - ProcessMessageError::{BadFormat, Corrupt, StackLimitReached, Unsupported, Yield}, - QueueFootprint, QueuePausedQuery, ServiceQueues, + ExecuteOverweightError, Footprint, ProcessMessage, ProcessMessageError, QueueFootprint, + QueuePausedQuery, ServiceQueues, }, BoundedSlice, CloneNoBound, DefaultNoBound, }; From 74f82f713eb586444c2063df8ece5ef6bb50a105 Mon Sep 17 00:00:00 2001 From: dharjeezy Date: Wed, 31 Jul 2024 13:41:15 +0100 Subject: [PATCH 06/13] nit --- Cargo.lock | 1 - substrate/frame/message-queue/Cargo.toml | 2 -- substrate/frame/message-queue/src/lib.rs | 7 +++---- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ab69ea211bec..7466975fa428 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10921,7 +10921,6 @@ dependencies = [ "rand_distr", "scale-info", "serde", - "sp-api", "sp-arithmetic", "sp-core", "sp-crypto-hashing", diff --git a/substrate/frame/message-queue/Cargo.toml b/substrate/frame/message-queue/Cargo.toml index 11902231ea93..a6de61d70abf 100644 --- a/substrate/frame/message-queue/Cargo.toml +++ b/substrate/frame/message-queue/Cargo.toml @@ -23,7 +23,6 @@ sp-io = { workspace = true } sp-runtime = { workspace = true } sp-arithmetic = { workspace = true } sp-weights = { workspace = true } -sp-api = { workspace = true } frame-benchmarking = { optional = true, workspace = true } frame-support = { workspace = true } @@ -51,7 +50,6 @@ std = [ "sp-runtime/std", "sp-tracing/std", "sp-weights/std", - "sp-api/std", ] runtime-benchmarks = [ "frame-benchmarking/runtime-benchmarks", diff --git a/substrate/frame/message-queue/src/lib.rs b/substrate/frame/message-queue/src/lib.rs index fa53d1ac695b..9ec38bf4949b 100644 --- a/substrate/frame/message-queue/src/lib.rs +++ b/substrate/frame/message-queue/src/lib.rs @@ -1447,16 +1447,15 @@ impl Pallet { use ProcessMessageError::*; let prev_consumed = meter.consumed(); - let transaction = storage::with_transaction( - || -> sp_api::TransactionOutcome> { + let transaction = + storage::with_transaction(|| -> TransactionOutcome> { let res = T::MessageProcessor::process_message(message, origin.clone(), meter, &mut id); match &res { Ok(_) => TransactionOutcome::Commit(Ok(res)), Err(_) => TransactionOutcome::Rollback(Ok(res)), } - }, - ); + }); let transaction = match transaction { Ok(result) => result, From c692298cc6a0b8ff0c1ae6d36850369bbbf8347f Mon Sep 17 00:00:00 2001 From: dharjeezy Date: Sun, 4 Aug 2024 07:46:08 +0100 Subject: [PATCH 07/13] include test --- substrate/frame/message-queue/src/mock.rs | 1 + substrate/frame/message-queue/src/tests.rs | 82 ++++++++++++++++++++++ 2 files changed, 83 insertions(+) diff --git a/substrate/frame/message-queue/src/mock.rs b/substrate/frame/message-queue/src/mock.rs index 26533cc7c330..4d9870e67c19 100644 --- a/substrate/frame/message-queue/src/mock.rs +++ b/substrate/frame/message-queue/src/mock.rs @@ -164,6 +164,7 @@ impl ProcessMessage for RecordingMessageProcessor { meter: &mut WeightMeter, _id: &mut [u8; 32], ) -> Result { + sp_io::storage::set(b"transactional_storage", &vec![1, 2, 3]); processing_message(message, &origin)?; let weight = if message.starts_with(&b"weight="[..]) { diff --git a/substrate/frame/message-queue/src/tests.rs b/substrate/frame/message-queue/src/tests.rs index e89fdb8b3208..0e4897b1ed3c 100644 --- a/substrate/frame/message-queue/src/tests.rs +++ b/substrate/frame/message-queue/src/tests.rs @@ -1975,3 +1975,85 @@ fn execute_overweight_keeps_stack_ov_message() { System::reset_events(); }); } + +/// Test that process_message is transactional +#[test] +fn test_process_message_transactional() { + use MessageOrigin::*; + build_and_execute::(|| { + // We need to create a mocked message that first reports insufficient weight, and then + // `StackLimitReached`: + IgnoreStackOvError::set(true); + MessageQueue::enqueue_message(msg("stacklimitreached"), Here); + MessageQueue::service_queues(0.into_weight()); + + assert_last_event::( + Event::OverweightEnqueued { + id: blake2_256(b"stacklimitreached"), + origin: MessageOrigin::Here, + message_index: 0, + page_index: 0, + } + .into(), + ); + // Does not count as 'processed': + assert!(MessagesProcessed::take().is_empty()); + assert_pages(&[0]); + + // create a storage + let vec_to_set = vec![1, 2, 3, 4, 5]; + sp_io::storage::set(b"transactional_storage", &vec_to_set); + + // Now let it return `StackLimitReached`. Note that this case would normally not happen, + // since we assume that the top-level execution is the one with the most remaining stack + // depth. + IgnoreStackOvError::set(false); + // Ensure that trying to execute the message does not change any state (besides events). + System::reset_events(); + let storage_noop = StorageNoopGuard::new(); + assert_eq!( + ::execute_overweight(3.into_weight(), (Here, 0, 0)), + Err(ExecuteOverweightError::Other) + ); + assert_last_event::( + Event::ProcessingFailed { + id: blake2_256(b"stacklimitreached").into(), + origin: MessageOrigin::Here, + error: ProcessMessageError::StackLimitReached, + } + .into(), + ); + System::reset_events(); + drop(storage_noop); + + // because the message was processed with an error, transactional_storage changes wasn't commited + // this means storage was rolled back + let stored_vec = sp_io::storage::get(b"transactional_storage").unwrap(); + assert_eq!(stored_vec, vec![1, 2, 3, 4, 5]); + + // Now let's process it normally: + IgnoreStackOvError::set(true); + assert_eq!( + ::execute_overweight(1.into_weight(), (Here, 0, 0)) + .unwrap(), + 1.into_weight() + ); + + // transactional storage changes, this means storage was committed + let stored_vec = sp_io::storage::get(b"transactional_storage").unwrap(); + assert_eq!(stored_vec, vec![1, 2, 3]); + + assert_last_event::( + Event::Processed { + id: blake2_256(b"stacklimitreached").into(), + origin: MessageOrigin::Here, + weight_used: 1.into_weight(), + success: true, + } + .into(), + ); + assert_pages(&[]); + System::reset_events(); + }); +} + From 8b21cca7331542a80347ef2d0721d0ddd55bf6ec Mon Sep 17 00:00:00 2001 From: dharjeezy Date: Sun, 4 Aug 2024 07:46:35 +0100 Subject: [PATCH 08/13] fmt --- substrate/frame/message-queue/src/tests.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/substrate/frame/message-queue/src/tests.rs b/substrate/frame/message-queue/src/tests.rs index 0e4897b1ed3c..5690c2f7c79c 100644 --- a/substrate/frame/message-queue/src/tests.rs +++ b/substrate/frame/message-queue/src/tests.rs @@ -1994,7 +1994,7 @@ fn test_process_message_transactional() { message_index: 0, page_index: 0, } - .into(), + .into(), ); // Does not count as 'processed': assert!(MessagesProcessed::take().is_empty()); @@ -2021,13 +2021,13 @@ fn test_process_message_transactional() { origin: MessageOrigin::Here, error: ProcessMessageError::StackLimitReached, } - .into(), + .into(), ); System::reset_events(); drop(storage_noop); - // because the message was processed with an error, transactional_storage changes wasn't commited - // this means storage was rolled back + // because the message was processed with an error, transactional_storage changes wasn't + // commited this means storage was rolled back let stored_vec = sp_io::storage::get(b"transactional_storage").unwrap(); assert_eq!(stored_vec, vec![1, 2, 3, 4, 5]); @@ -2050,10 +2050,9 @@ fn test_process_message_transactional() { weight_used: 1.into_weight(), success: true, } - .into(), + .into(), ); assert_pages(&[]); System::reset_events(); }); } - From d8201b14db5c6f258991bd592431856e36b9cf62 Mon Sep 17 00:00:00 2001 From: Oliver Tale-Yazdi Date: Mon, 26 Aug 2024 12:13:46 +0200 Subject: [PATCH 09/13] Fix test Signed-off-by: Oliver Tale-Yazdi --- .../message-queue/src/integration_test.rs | 1 + substrate/frame/message-queue/src/mock.rs | 11 +- substrate/frame/message-queue/src/tests.rs | 131 ++++++------------ 3 files changed, 50 insertions(+), 93 deletions(-) diff --git a/substrate/frame/message-queue/src/integration_test.rs b/substrate/frame/message-queue/src/integration_test.rs index 14b8d2217eb2..e4db87d8be7a 100644 --- a/substrate/frame/message-queue/src/integration_test.rs +++ b/substrate/frame/message-queue/src/integration_test.rs @@ -151,6 +151,7 @@ fn stress_test_recursive() { TotalEnqueued::set(TotalEnqueued::get() + enqueued); Enqueued::set(Enqueued::get() + enqueued); Called::set(Called::get() + 1); + Ok(()) })); build_and_execute::(|| { diff --git a/substrate/frame/message-queue/src/mock.rs b/substrate/frame/message-queue/src/mock.rs index 4d9870e67c19..bef4ab0e73b9 100644 --- a/substrate/frame/message-queue/src/mock.rs +++ b/substrate/frame/message-queue/src/mock.rs @@ -164,7 +164,6 @@ impl ProcessMessage for RecordingMessageProcessor { meter: &mut WeightMeter, _id: &mut [u8; 32], ) -> Result { - sp_io::storage::set(b"transactional_storage", &vec![1, 2, 3]); processing_message(message, &origin)?; let weight = if message.starts_with(&b"weight="[..]) { @@ -185,7 +184,9 @@ impl ProcessMessage for RecordingMessageProcessor { if meter.try_consume(required).is_ok() { if let Some(p) = message.strip_prefix(&b"callback="[..]) { let s = String::from_utf8(p.to_vec()).expect("Need valid UTF8"); - Callback::get()(&origin, s.parse().expect("Expected an u32")); + if let Err(()) = Callback::get()(&origin, s.parse().expect("Expected an u32")) { + return Err(ProcessMessageError::Corrupt) + } } let mut m = MessagesProcessed::get(); m.push((message.to_vec(), origin)); @@ -198,7 +199,7 @@ impl ProcessMessage for RecordingMessageProcessor { } parameter_types! { - pub static Callback: Box = Box::new(|_, _| {}); + pub static Callback: Box Result<(), ()>> = Box::new(|_, _| { Ok(()) }); pub static IgnoreStackOvError: bool = false; } @@ -253,7 +254,9 @@ impl ProcessMessage for CountingMessageProcessor { if meter.try_consume(required).is_ok() { if let Some(p) = message.strip_prefix(&b"callback="[..]) { let s = String::from_utf8(p.to_vec()).expect("Need valid UTF8"); - Callback::get()(&origin, s.parse().expect("Expected an u32")); + if let Err(()) = Callback::get()(&origin, s.parse().expect("Expected an u32")) { + return Err(ProcessMessageError::Corrupt) + } } NumMessagesProcessed::set(NumMessagesProcessed::get() + 1); Ok(true) diff --git a/substrate/frame/message-queue/src/tests.rs b/substrate/frame/message-queue/src/tests.rs index 5690c2f7c79c..8c46f1cce0f1 100644 --- a/substrate/frame/message-queue/src/tests.rs +++ b/substrate/frame/message-queue/src/tests.rs @@ -1675,6 +1675,7 @@ fn regression_issue_2319() { build_and_execute::(|| { Callback::set(Box::new(|_, _| { MessageQueue::enqueue_message(mock_helpers::msg("anothermessage"), There); + Ok(()) })); use MessageOrigin::*; @@ -1695,23 +1696,26 @@ fn regression_issue_2319() { #[test] fn recursive_enqueue_works() { build_and_execute::(|| { - Callback::set(Box::new(|o, i| match i { - 0 => { - MessageQueue::enqueue_message(msg(&format!("callback={}", 1)), *o); - }, - 1 => { - for _ in 0..100 { - MessageQueue::enqueue_message(msg(&format!("callback={}", 2)), *o); - } - for i in 0..100 { - MessageQueue::enqueue_message(msg(&format!("callback={}", 3)), i.into()); - } - }, - 2 | 3 => { - MessageQueue::enqueue_message(msg(&format!("callback={}", 4)), *o); - }, - 4 => (), - _ => unreachable!(), + Callback::set(Box::new(|o, i| { + match i { + 0 => { + MessageQueue::enqueue_message(msg(&format!("callback={}", 1)), *o); + }, + 1 => { + for _ in 0..100 { + MessageQueue::enqueue_message(msg(&format!("callback={}", 2)), *o); + } + for i in 0..100 { + MessageQueue::enqueue_message(msg(&format!("callback={}", 3)), i.into()); + } + }, + 2 | 3 => { + MessageQueue::enqueue_message(msg(&format!("callback={}", 4)), *o); + }, + 4 => (), + _ => unreachable!(), + }; + Ok(()) })); MessageQueue::enqueue_message(msg("callback=0"), MessageOrigin::Here); @@ -1735,6 +1739,7 @@ fn recursive_service_is_forbidden() { // This call will fail since it is recursive. But it will not mess up the state. assert_storage_noop!(MessageQueue::service_queues(10.into_weight())); MessageQueue::enqueue_message(msg("m2"), There); + Ok(()) })); for _ in 0..5 { @@ -1778,6 +1783,7 @@ fn recursive_overweight_while_service_is_forbidden() { ), ExecuteOverweightError::RecursiveDisallowed ); + Ok(()) })); MessageQueue::enqueue_message(msg("weight=10"), There); @@ -1800,6 +1806,7 @@ fn recursive_reap_page_is_forbidden() { Callback::set(Box::new(|_, _| { // This call will fail since it is recursive. But it will not mess up the state. assert_noop!(MessageQueue::do_reap_page(&Here, 0), Error::::RecursiveDisallowed); + Ok(()) })); // Create 10 pages more than the stale limit. @@ -1976,83 +1983,29 @@ fn execute_overweight_keeps_stack_ov_message() { }); } -/// Test that process_message is transactional #[test] -fn test_process_message_transactional() { - use MessageOrigin::*; +fn process_message_error_reverts_storage_changes() { build_and_execute::(|| { - // We need to create a mocked message that first reports insufficient weight, and then - // `StackLimitReached`: - IgnoreStackOvError::set(true); - MessageQueue::enqueue_message(msg("stacklimitreached"), Here); - MessageQueue::service_queues(0.into_weight()); - - assert_last_event::( - Event::OverweightEnqueued { - id: blake2_256(b"stacklimitreached"), - origin: MessageOrigin::Here, - message_index: 0, - page_index: 0, - } - .into(), - ); - // Does not count as 'processed': - assert!(MessagesProcessed::take().is_empty()); - assert_pages(&[0]); + assert!(!sp_io::storage::exists(b"key"), "Key should not exist"); - // create a storage - let vec_to_set = vec![1, 2, 3, 4, 5]; - sp_io::storage::set(b"transactional_storage", &vec_to_set); - - // Now let it return `StackLimitReached`. Note that this case would normally not happen, - // since we assume that the top-level execution is the one with the most remaining stack - // depth. - IgnoreStackOvError::set(false); - // Ensure that trying to execute the message does not change any state (besides events). - System::reset_events(); - let storage_noop = StorageNoopGuard::new(); - assert_eq!( - ::execute_overweight(3.into_weight(), (Here, 0, 0)), - Err(ExecuteOverweightError::Other) - ); - assert_last_event::( - Event::ProcessingFailed { - id: blake2_256(b"stacklimitreached").into(), - origin: MessageOrigin::Here, - error: ProcessMessageError::StackLimitReached, - } - .into(), - ); - System::reset_events(); - drop(storage_noop); + Callback::set(Box::new(|_, _| { + sp_io::storage::set(b"key", b"value"); + Err(()) + })); - // because the message was processed with an error, transactional_storage changes wasn't - // commited this means storage was rolled back - let stored_vec = sp_io::storage::get(b"transactional_storage").unwrap(); - assert_eq!(stored_vec, vec![1, 2, 3, 4, 5]); + MessageQueue::enqueue_message(msg("callback=0"), MessageOrigin::Here); + MessageQueue::service_queues(10.into_weight()); - // Now let's process it normally: - IgnoreStackOvError::set(true); - assert_eq!( - ::execute_overweight(1.into_weight(), (Here, 0, 0)) - .unwrap(), - 1.into_weight() - ); + assert!(!sp_io::storage::exists(b"key"), "Key should have been rolled back"); + }); +} - // transactional storage changes, this means storage was committed - let stored_vec = sp_io::storage::get(b"transactional_storage").unwrap(); - assert_eq!(stored_vec, vec![1, 2, 3]); +#[test] +fn process_message_ok_false_keeps_storage_changes() { + // FAIL-CI TODO +} - assert_last_event::( - Event::Processed { - id: blake2_256(b"stacklimitreached").into(), - origin: MessageOrigin::Here, - weight_used: 1.into_weight(), - success: true, - } - .into(), - ); - assert_pages(&[]); - System::reset_events(); - }); +#[test] +fn process_message_ok_true_keeps_storage_changes() { + // FAIL-CI TODO } From 851c534a8f9856e1b0eb19169f76ea044b72096d Mon Sep 17 00:00:00 2001 From: dharjeezy Date: Wed, 28 Aug 2024 12:02:59 +0100 Subject: [PATCH 10/13] feedback on pr, more tests, and return an error instead of panicking --- substrate/frame/message-queue/src/lib.rs | 2 +- substrate/frame/message-queue/src/mock.rs | 5 ++++ substrate/frame/message-queue/src/tests.rs | 28 ++++++++++++++++++++-- 3 files changed, 32 insertions(+), 3 deletions(-) diff --git a/substrate/frame/message-queue/src/lib.rs b/substrate/frame/message-queue/src/lib.rs index 9ec38bf4949b..36daf93d155d 100644 --- a/substrate/frame/message-queue/src/lib.rs +++ b/substrate/frame/message-queue/src/lib.rs @@ -1459,7 +1459,7 @@ impl Pallet { let transaction = match transaction { Ok(result) => result, - _ => unreachable!(), + _ => return MessageExecutionStatus::Unprocessable { permanent: false }, }; match transaction { diff --git a/substrate/frame/message-queue/src/mock.rs b/substrate/frame/message-queue/src/mock.rs index bef4ab0e73b9..d3f719c62356 100644 --- a/substrate/frame/message-queue/src/mock.rs +++ b/substrate/frame/message-queue/src/mock.rs @@ -187,7 +187,12 @@ impl ProcessMessage for RecordingMessageProcessor { if let Err(()) = Callback::get()(&origin, s.parse().expect("Expected an u32")) { return Err(ProcessMessageError::Corrupt) } + + if s.contains("000") { + return Ok(false) + } } + let mut m = MessagesProcessed::get(); m.push((message.to_vec(), origin)); MessagesProcessed::set(m); diff --git a/substrate/frame/message-queue/src/tests.rs b/substrate/frame/message-queue/src/tests.rs index 8c46f1cce0f1..1c1f6517a088 100644 --- a/substrate/frame/message-queue/src/tests.rs +++ b/substrate/frame/message-queue/src/tests.rs @@ -2002,10 +2002,34 @@ fn process_message_error_reverts_storage_changes() { #[test] fn process_message_ok_false_keeps_storage_changes() { - // FAIL-CI TODO + build_and_execute::(|| { + assert!(!sp_io::storage::exists(b"key"), "Key should not exist"); + + Callback::set(Box::new(|_, _| { + sp_io::storage::set(b"key", b"value"); + Ok(()) + })); + + MessageQueue::enqueue_message(msg("callback=000"), MessageOrigin::Here); + MessageQueue::service_queues(10.into_weight()); + + assert_eq!(sp_io::storage::exists(b"key"), true); + }); } #[test] fn process_message_ok_true_keeps_storage_changes() { - // FAIL-CI TODO + build_and_execute::(|| { + assert!(!sp_io::storage::exists(b"key"), "Key should not exist"); + + Callback::set(Box::new(|_, _| { + sp_io::storage::set(b"key", b"value"); + Ok(()) + })); + + MessageQueue::enqueue_message(msg("callback=0"), MessageOrigin::Here); + MessageQueue::service_queues(10.into_weight()); + + assert_eq!(sp_io::storage::exists(b"key"), true); + }); } From f60ac77a48b6ef8165639ba91e9839e05fcc45a2 Mon Sep 17 00:00:00 2001 From: dharjeezy Date: Wed, 28 Aug 2024 12:05:13 +0100 Subject: [PATCH 11/13] pr doc change --- prdoc/pr_5198.prdoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prdoc/pr_5198.prdoc b/prdoc/pr_5198.prdoc index 9f7d59e39149..71c050cfbf67 100644 --- a/prdoc/pr_5198.prdoc +++ b/prdoc/pr_5198.prdoc @@ -7,4 +7,4 @@ doc: crates: - name: pallet-message-queue - bump: patch \ No newline at end of file + bump: major \ No newline at end of file From c9e259ed8ba603168a24df92af889cfecad74017 Mon Sep 17 00:00:00 2001 From: dharjeezy Date: Wed, 28 Aug 2024 23:15:08 +0100 Subject: [PATCH 12/13] update doc --- substrate/frame/message-queue/src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/substrate/frame/message-queue/src/lib.rs b/substrate/frame/message-queue/src/lib.rs index 36daf93d155d..06e7bd8bce57 100644 --- a/substrate/frame/message-queue/src/lib.rs +++ b/substrate/frame/message-queue/src/lib.rs @@ -1435,6 +1435,8 @@ impl Pallet { /// The base weight of this function needs to be accounted for by the caller. `weight` is the /// remaining weight to process the message. `overweight_limit` is the maximum weight that a /// message can ever consume. Messages above this limit are marked as permanently overweight. + /// This process is also transactional, any form of error that occurs in processing a message + /// causes storage changes to be rolled back. fn process_message_payload( origin: MessageOriginOf, page_index: PageIndex, From 8589b76796981cecee72f4ad0b90faeeb963204f Mon Sep 17 00:00:00 2001 From: dharjeezy Date: Thu, 29 Aug 2024 11:40:21 +0100 Subject: [PATCH 13/13] further updates --- prdoc/pr_5198.prdoc | 7 +++++-- substrate/frame/message-queue/src/lib.rs | 7 ++++++- substrate/frame/message-queue/src/tests.rs | 1 + 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/prdoc/pr_5198.prdoc b/prdoc/pr_5198.prdoc index 71c050cfbf67..417b0b5a4fd9 100644 --- a/prdoc/pr_5198.prdoc +++ b/prdoc/pr_5198.prdoc @@ -1,9 +1,12 @@ title: "MQ processor should be transactional" doc: - - audience: Runtime User + - audience: [Runtime User, Runtime Dev] description: | - Enforce transactional processing on pallet Message Queue Processor + Enforce transactional processing on pallet Message Queue Processor. + + Storage changes that were done while processing a message will now be rolled back + when the processing returns an error. `Ok(false)` will not revert, only `Err(_)`. crates: - name: pallet-message-queue diff --git a/substrate/frame/message-queue/src/lib.rs b/substrate/frame/message-queue/src/lib.rs index 06e7bd8bce57..48002acb1474 100644 --- a/substrate/frame/message-queue/src/lib.rs +++ b/substrate/frame/message-queue/src/lib.rs @@ -1461,7 +1461,12 @@ impl Pallet { let transaction = match transaction { Ok(result) => result, - _ => return MessageExecutionStatus::Unprocessable { permanent: false }, + _ => { + defensive!( + "Error occurred processing message, storage changes will be rolled back" + ); + return MessageExecutionStatus::Unprocessable { permanent: true } + }, }; match transaction { diff --git a/substrate/frame/message-queue/src/tests.rs b/substrate/frame/message-queue/src/tests.rs index 1c1f6517a088..fac135f135ce 100644 --- a/substrate/frame/message-queue/src/tests.rs +++ b/substrate/frame/message-queue/src/tests.rs @@ -2010,6 +2010,7 @@ fn process_message_ok_false_keeps_storage_changes() { Ok(()) })); + // 000 will make it return `Ok(false)` MessageQueue::enqueue_message(msg("callback=000"), MessageOrigin::Here); MessageQueue::service_queues(10.into_weight());