diff --git a/CLI.md b/CLI.md index 5a19daff40b..42956b7067b 100644 --- a/CLI.md +++ b/CLI.md @@ -98,7 +98,7 @@ A Byzantine-fault tolerant sidechain with low-latency finality and high throughp * `--recv-timeout-ms ` — Timeout for receiving responses (milliseconds) Default value: `4000` -* `--max-pending-messages ` +* `--max-pending-message-bundles ` — The maximum number of incoming message bundles to include in a block proposal Default value: `10` * `--wasm-runtime ` — The WebAssembly runtime to use diff --git a/linera-chain/src/chain.rs b/linera-chain/src/chain.rs index 3f2e1722710..55b236184a4 100644 --- a/linera-chain/src/chain.rs +++ b/linera-chain/src/chain.rs @@ -278,7 +278,7 @@ pub struct ChainTipState { pub block_hash: Option, /// Sequence number tracking blocks. pub next_block_height: BlockHeight, - /// Number of incoming messages. + /// Number of incoming message bundles. pub num_incoming_bundles: u32, /// Number of operations. pub num_operations: u32, @@ -623,7 +623,7 @@ where Ok(true) } - /// Removes the incoming messages in the block from the inboxes. + /// Removes the incoming message bundles in the block from the inboxes. pub async fn remove_bundles_from_inboxes(&mut self, block: &Block) -> Result<(), ChainError> { let chain_id = self.chain_id(); let mut bundles_by_origin: BTreeMap<_, Vec<&MessageBundle>> = Default::default(); @@ -691,7 +691,7 @@ where /// * Modifies the state of outboxes and channels, if needed. /// * As usual, in case of errors, `self` may not be consistent any more and should be thrown /// away. - /// * Returns the list of messages caused by the block being executed. + /// * Returns the outcome of the execution. pub async fn execute_block( &mut self, block: &Block, diff --git a/linera-client/src/client_context.rs b/linera-client/src/client_context.rs index 04892acbf47..eb189b2388a 100644 --- a/linera-client/src/client_context.rs +++ b/linera-client/src/client_context.rs @@ -140,7 +140,7 @@ where let client = Client::new( node_provider, storage, - options.max_pending_messages, + options.max_pending_message_bundles, delivery, wallet.chain_ids(), "Client node", diff --git a/linera-client/src/client_options.rs b/linera-client/src/client_options.rs index fb5e965c54c..03947432976 100644 --- a/linera-client/src/client_options.rs +++ b/linera-client/src/client_options.rs @@ -86,8 +86,9 @@ pub struct ClientOptions { #[arg(long = "recv-timeout-ms", default_value = "4000", value_parser = util::parse_millis)] pub recv_timeout: Duration, + /// The maximum number of incoming message bundles to include in a block proposal. #[arg(long, default_value = "10")] - pub max_pending_messages: usize, + pub max_pending_message_bundles: usize, /// The WebAssembly runtime to use. #[arg(long)] diff --git a/linera-core/src/chain_worker/state/temporary_changes.rs b/linera-core/src/chain_worker/state/temporary_changes.rs index 3b57f4f5f61..737aa0fdc45 100644 --- a/linera-core/src/chain_worker/state/temporary_changes.rs +++ b/linera-core/src/chain_worker/state/temporary_changes.rs @@ -283,7 +283,7 @@ where } ); } - if query.request_pending_messages { + if query.request_pending_message_bundles { let mut messages = Vec::new(); let pairs = chain.inboxes.try_load_all_entries().await?; let action = if *chain.execution_state.system.closed.get() { @@ -315,7 +315,7 @@ where } } - info.requested_pending_messages = messages; + info.requested_pending_message_bundles = messages; } if let Some(range) = query.request_sent_certificate_hashes_in_range { let start: usize = range.start.try_into()?; diff --git a/linera-core/src/client.rs b/linera-core/src/client.rs index 14636de28c7..3acfbb605c9 100644 --- a/linera-core/src/client.rs +++ b/linera-core/src/client.rs @@ -87,8 +87,8 @@ where /// Local node to manage the execution state and the local storage of the chains that we are /// tracking. local_node: LocalNodeClient, - /// Maximum number of pending messages processed at a time in a block. - max_pending_messages: usize, + /// Maximum number of pending message bundles processed at a time in a block. + max_pending_message_bundles: usize, /// The policy for automatically handling incoming messages. message_policy: MessagePolicy, /// Whether to block on cross-chain message delivery. @@ -111,7 +111,7 @@ impl Client { pub fn new( validator_node_provider: P, storage: S, - max_pending_messages: usize, + max_pending_message_bundles: usize, cross_chain_message_delivery: CrossChainMessageDelivery, tracked_chains: impl IntoIterator, name: impl Into, @@ -131,7 +131,7 @@ impl Client { validator_node_provider, local_node, chains: DashMap::new(), - max_pending_messages, + max_pending_message_bundles, message_policy: MessagePolicy::new(BlanketMessagePolicy::Accept, None), cross_chain_message_delivery, tracked_chains, @@ -199,7 +199,7 @@ impl Client { client: self.clone(), chain_id, options: ChainClientOptions { - max_pending_messages: self.max_pending_messages, + max_pending_message_bundles: self.max_pending_message_bundles, message_policy: self.message_policy.clone(), cross_chain_message_delivery: self.cross_chain_message_delivery, }, @@ -297,8 +297,8 @@ pub struct ChainState { #[non_exhaustive] #[derive(Debug, Clone)] pub struct ChainClientOptions { - /// Maximum number of pending messages processed at a time in a block. - pub max_pending_messages: usize, + /// Maximum number of pending message bundles processed at a time in a block. + pub max_pending_message_bundles: usize, /// The policy for automatically handling incoming messages. pub message_policy: MessagePolicy, /// Whether to block on cross-chain message delivery. @@ -585,17 +585,15 @@ where } #[tracing::instrument(level = "trace")] - /// Obtains up to `self.options.max_pending_messages` pending messages for the local chain. - /// - /// Messages known to be redundant are filtered out: A `RegisterApplications` message whose - /// entries are already known never needs to be included in a block. - async fn pending_messages(&self) -> Result, ChainClientError> { + /// Obtains up to `self.options.max_pending_message_bundles` pending message bundles for the + /// local chain. + async fn pending_message_bundles(&self) -> Result, ChainClientError> { if self.state().next_block_height != BlockHeight::ZERO && self.options.message_policy.is_ignore() { return Ok(Vec::new()); // OpenChain is already received, other are ignored. } - let query = ChainInfoQuery::new(self.chain_id).with_pending_messages(); + let query = ChainInfoQuery::new(self.chain_id).with_pending_message_bundles(); let info = self .client .local_node @@ -606,8 +604,8 @@ where info.next_block_height == self.state().next_block_height, ChainClientError::WalletSynchronizationError ); - let mut requested_pending_messages = info.requested_pending_messages; - let mut pending_messages = vec![]; + let mut requested_pending_message_bundles = info.requested_pending_message_bundles; + let mut pending_message_bundles = vec![]; // The first incoming message of any child chain must be `OpenChain`. We must have it in // our inbox, and include it before all other messages. if info.next_block_height == BlockHeight::ZERO @@ -616,37 +614,40 @@ where .ok_or_else(|| LocalNodeError::InactiveChain(self.chain_id))? .is_child() { - let Some(index) = requested_pending_messages.iter().position(|message| { - matches!( - message.bundle.messages.first(), - Some(PostedMessage { - message: Message::System(SystemMessage::OpenChain(_)), - .. - }) - ) - }) else { + let Some(index) = requested_pending_message_bundles + .iter() + .position(|message| { + matches!( + message.bundle.messages.first(), + Some(PostedMessage { + message: Message::System(SystemMessage::OpenChain(_)), + .. + }) + ) + }) + else { return Err(LocalNodeError::InactiveChain(self.chain_id).into()); }; - let open_chain_message = requested_pending_messages.remove(index); - pending_messages.push(open_chain_message); + let open_chain_bundle = requested_pending_message_bundles.remove(index); + pending_message_bundles.push(open_chain_bundle); } if self.options.message_policy.is_ignore() { - return Ok(pending_messages); // Ignore messages other than OpenChain. + return Ok(pending_message_bundles); // Ignore messages other than OpenChain. } - for mut message in requested_pending_messages { - if pending_messages.len() >= self.options.max_pending_messages { + for mut bundle in requested_pending_message_bundles { + if pending_message_bundles.len() >= self.options.max_pending_message_bundles { warn!( - "Limiting block to {} incoming messages", - self.options.max_pending_messages + "Limiting block to {} incoming message bundles", + self.options.max_pending_message_bundles ); break; } - if !self.options.message_policy.handle(&mut message) { + if !self.options.message_policy.handle(&mut bundle) { continue; } - pending_messages.push(message); + pending_message_bundles.push(bundle); } - Ok(pending_messages) + Ok(pending_message_bundles) } #[tracing::instrument(level = "trace")] @@ -1907,7 +1908,7 @@ where return Ok(ExecuteBlockOutcome::WaitForTimeout(timeout)) } } - let incoming_bundles = self.pending_messages().await?; + let incoming_bundles = self.pending_message_bundles().await?; let confirmed_value = self.set_pending_block(incoming_bundles, operations).await?; match self.process_pending_block_without_prepare().await? { ClientOutcome::Committed(Some(certificate)) @@ -2035,7 +2036,7 @@ where /// incoming messages in a new block. /// /// Does not attempt to synchronize with validators. The result will reflect up to - /// `max_pending_messages` incoming messages and the execution fees for a single + /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single /// block. pub async fn query_balance(&self) -> Result { let (balance, _) = self.query_balances_with_owner(None).await?; @@ -2047,7 +2048,7 @@ where /// incoming messages in a new block. /// /// Does not attempt to synchronize with validators. The result will reflect up to - /// `max_pending_messages` incoming messages and the execution fees for a single + /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single /// block. pub async fn query_owner_balance(&self, owner: Owner) -> Result { Ok(self @@ -2062,13 +2063,13 @@ where /// staging the execution of incoming messages in a new block. /// /// Does not attempt to synchronize with validators. The result will reflect up to - /// `max_pending_messages` incoming messages and the execution fees for a single + /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single /// block. async fn query_balances_with_owner( &self, owner: Option, ) -> Result<(Amount, Option), ChainClientError> { - let incoming_bundles = self.pending_messages().await?; + let incoming_bundles = self.pending_message_bundles().await?; let timestamp = self.next_timestamp(&incoming_bundles).await; let block = Block { epoch: self.epoch().await?, @@ -2689,7 +2690,8 @@ where } #[tracing::instrument(level = "trace")] - /// Creates an empty block to process all incoming messages. This may require several blocks. + /// Creates blocks without any operations to process all incoming messages. This may require + /// several blocks. /// /// If not all certificates could be processed due to a timeout, the timestamp for when to retry /// is returned, too. @@ -2699,7 +2701,7 @@ where self.prepare_chain().await?; let mut certificates = Vec::new(); loop { - let incoming_bundles = self.pending_messages().await?; + let incoming_bundles = self.pending_message_bundles().await?; if incoming_bundles.is_empty() { return Ok((certificates, None)); } @@ -2715,7 +2717,9 @@ where } #[tracing::instrument(level = "trace")] - /// Creates an empty block to process all incoming messages. This may require several blocks. + /// Creates blocks without any operations to process all incoming messages. This may require + /// several blocks. + /// /// If we are not a chain owner, this doesn't fail, and just returns an empty list. pub async fn process_inbox_if_owned( &self, diff --git a/linera-core/src/data_types.rs b/linera-core/src/data_types.rs index 86d1e628a23..599e267fe03 100644 --- a/linera-core/src/data_types.rs +++ b/linera-core/src/data_types.rs @@ -55,7 +55,7 @@ pub struct ChainInfoQuery { /// Query the current committees. pub request_committees: bool, /// Query the received messages that are waiting be picked in the next block. - pub request_pending_messages: bool, + pub request_pending_message_bundles: bool, /// Query a range of certificate hashes sent from the chain. pub request_sent_certificate_hashes_in_range: Option, /// Query new certificate sender chain IDs and block heights received from the chain. @@ -75,7 +75,7 @@ impl ChainInfoQuery { test_next_block_height: None, request_committees: false, request_owner_balance: None, - request_pending_messages: false, + request_pending_message_bundles: false, request_sent_certificate_hashes_in_range: None, request_received_log_excluding_first_nth: None, request_manager_values: false, @@ -99,8 +99,8 @@ impl ChainInfoQuery { self } - pub fn with_pending_messages(mut self) -> Self { - self.request_pending_messages = true; + pub fn with_pending_message_bundles(mut self) -> Self { + self.request_pending_message_bundles = true; self } @@ -156,7 +156,7 @@ pub struct ChainInfo { /// The current committees. pub requested_committees: Option>, /// The received messages that are waiting be picked in the next block (if requested). - pub requested_pending_messages: Vec, + pub requested_pending_message_bundles: Vec, /// The response to `request_sent_certificate_hashes_in_range` pub requested_sent_certificate_hashes: Vec, /// The current number of received certificates (useful for `request_received_log_excluding_first_nth`) @@ -236,7 +236,7 @@ where state_hash: *view.execution_state_hash.get(), requested_committees: None, requested_owner_balance: None, - requested_pending_messages: Vec::new(), + requested_pending_message_bundles: Vec::new(), requested_sent_certificate_hashes: Vec::new(), count_received_log: view.received_log.count(), requested_received_log: Vec::new(), diff --git a/linera-rpc/proto/rpc.proto b/linera-rpc/proto/rpc.proto index 30ddd9e0a9e..fcc3d1df7eb 100644 --- a/linera-rpc/proto/rpc.proto +++ b/linera-rpc/proto/rpc.proto @@ -140,7 +140,7 @@ message ChainInfoQuery { bool request_committees = 3; // Query the received messages that are waiting be picked in the next block. - bool request_pending_messages = 4; + bool request_pending_message_bundles = 4; // Query a range of certificates hashes sent from the chain. optional bytes request_sent_certificate_hashes_in_range = 5; diff --git a/linera-rpc/src/grpc/conversions.rs b/linera-rpc/src/grpc/conversions.rs index 45bddbf6f8e..ba29c4b8404 100644 --- a/linera-rpc/src/grpc/conversions.rs +++ b/linera-rpc/src/grpc/conversions.rs @@ -377,7 +377,7 @@ impl TryFrom for ChainInfoQuery { .request_owner_balance .map(TryInto::try_into) .transpose()?, - request_pending_messages: chain_info_query.request_pending_messages, + request_pending_message_bundles: chain_info_query.request_pending_message_bundles, chain_id: try_proto_convert(chain_info_query.chain_id)?, request_sent_certificate_hashes_in_range, request_received_log_excluding_first_nth: chain_info_query @@ -403,7 +403,7 @@ impl TryFrom for api::ChainInfoQuery { chain_id: Some(chain_info_query.chain_id.into()), request_committees: chain_info_query.request_committees, request_owner_balance: chain_info_query.request_owner_balance.map(Into::into), - request_pending_messages: chain_info_query.request_pending_messages, + request_pending_message_bundles: chain_info_query.request_pending_message_bundles, test_next_block_height: chain_info_query.test_next_block_height.map(Into::into), request_sent_certificate_hashes_in_range, request_received_log_excluding_first_nth: chain_info_query @@ -698,7 +698,7 @@ pub mod tests { state_hash: None, requested_committees: None, requested_owner_balance: None, - requested_pending_messages: vec![], + requested_pending_message_bundles: vec![], requested_sent_certificate_hashes: vec![], count_received_log: 0, requested_received_log: vec![], @@ -729,7 +729,7 @@ pub mod tests { test_next_block_height: Some(BlockHeight::from(10)), request_committees: false, request_owner_balance: None, - request_pending_messages: false, + request_pending_message_bundles: false, request_sent_certificate_hashes_in_range: Some( linera_core::data_types::BlockHeightRange { start: BlockHeight::from(3), diff --git a/linera-rpc/tests/snapshots/format__format.yaml.snap b/linera-rpc/tests/snapshots/format__format.yaml.snap index ed69e9f3cc8..b3ca19e91c4 100644 --- a/linera-rpc/tests/snapshots/format__format.yaml.snap +++ b/linera-rpc/tests/snapshots/format__format.yaml.snap @@ -216,7 +216,7 @@ ChainInfo: TYPENAME: Epoch VALUE: TYPENAME: Committee - - requested_pending_messages: + - requested_pending_message_bundles: SEQ: TYPENAME: IncomingBundle - requested_sent_certificate_hashes: @@ -237,7 +237,7 @@ ChainInfoQuery: OPTION: TYPENAME: Owner - request_committees: BOOL - - request_pending_messages: BOOL + - request_pending_message_bundles: BOOL - request_sent_certificate_hashes_in_range: OPTION: TYPENAME: BlockHeightRange diff --git a/linera-sdk/src/test/chain.rs b/linera-sdk/src/test/chain.rs index 510ec9966cb..8cc9e48c34f 100644 --- a/linera-sdk/src/test/chain.rs +++ b/linera-sdk/src/test/chain.rs @@ -135,10 +135,10 @@ impl ActiveChain { let (information, _) = self .validator .worker() - .handle_chain_info_query(ChainInfoQuery::new(chain_id).with_pending_messages()) + .handle_chain_info_query(ChainInfoQuery::new(chain_id).with_pending_message_bundles()) .await .expect("Failed to query chain's pending messages"); - let messages = information.info.requested_pending_messages; + let messages = information.info.requested_pending_message_bundles; self.add_block(|block| { block.with_incoming_bundles(messages); diff --git a/linera-service-graphql-client/gql/service_schema.graphql b/linera-service-graphql-client/gql/service_schema.graphql index 3085967fb20..29f440a86b2 100644 --- a/linera-service-graphql-client/gql/service_schema.graphql +++ b/linera-service-graphql-client/gql/service_schema.graphql @@ -252,7 +252,7 @@ type ChainTipState { """ nextBlockHeight: BlockHeight! """ - Number of incoming messages. + Number of incoming message bundles. """ numIncomingBundles: Int! """ diff --git a/linera-service/src/cli_wrappers/wallet.rs b/linera-service/src/cli_wrappers/wallet.rs index 338fac53f7e..ad55f754df1 100644 --- a/linera-service/src/cli_wrappers/wallet.rs +++ b/linera-service/src/cli_wrappers/wallet.rs @@ -57,7 +57,7 @@ pub struct ClientWrapper { testing_prng_seed: Option, storage: String, wallet: String, - max_pending_messages: usize, + max_pending_message_bundles: usize, network: Network, pub path_provider: PathProvider, } @@ -79,7 +79,7 @@ impl ClientWrapper { testing_prng_seed, storage, wallet, - max_pending_messages: 10_000, + max_pending_message_bundles: 10_000, network, path_provider, } @@ -152,8 +152,8 @@ impl ClientWrapper { .args(["--wallet", &self.wallet]) .args(["--storage", &self.storage]) .args([ - "--max-pending-messages", - &self.max_pending_messages.to_string(), + "--max-pending-message-bundles", + &self.max_pending_message_bundles.to_string(), ]) .args(["--send-timeout-ms", "500000"]) .args(["--recv-timeout-ms", "500000"]) diff --git a/linera-service/src/linera/main.rs b/linera-service/src/linera/main.rs index e3e3704669d..04357cbf5de 100644 --- a/linera-service/src/linera/main.rs +++ b/linera-service/src/linera/main.rs @@ -303,7 +303,8 @@ impl Runnable for Job { let account = account.unwrap_or_else(|| context.default_account()); let chain_client = context.make_chain_client(account.chain_id); info!( - "Evaluating the local balance of {} by staging execution of known incoming messages", account + "Evaluating the local balance of {account} by staging execution of known \ + incoming messages" ); let time_start = Instant::now(); let balance = match account.owner { @@ -438,7 +439,6 @@ impl Runnable for Job { info!("Starting operations to change validator set"); let time_start = Instant::now(); - // Make sure genesis chains are subscribed to the admin chain. let context = Arc::new(Mutex::new(context)); let mut context = context.lock().await; if let SetValidator {