Skip to content

Commit

Permalink
Update outdated comments and identifiers (#2448)
Browse files Browse the repository at this point in the history
* Remove an obsolete comment.

* Update comments, identifiers related to message bundles.
  • Loading branch information
afck authored Sep 4, 2024
1 parent 37c8976 commit 69354e8
Show file tree
Hide file tree
Showing 14 changed files with 77 additions and 72 deletions.
2 changes: 1 addition & 1 deletion CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ A Byzantine-fault tolerant sidechain with low-latency finality and high throughp
* `--recv-timeout-ms <RECV_TIMEOUT>` — Timeout for receiving responses (milliseconds)

Default value: `4000`
* `--max-pending-messages <MAX_PENDING_MESSAGES>`
* `--max-pending-message-bundles <MAX_PENDING_MESSAGE_BUNDLES>` — The maximum number of incoming message bundles to include in a block proposal

Default value: `10`
* `--wasm-runtime <WASM_RUNTIME>` — The WebAssembly runtime to use
Expand Down
6 changes: 3 additions & 3 deletions linera-chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ pub struct ChainTipState {
pub block_hash: Option<CryptoHash>,
/// Sequence number tracking blocks.
pub next_block_height: BlockHeight,
/// Number of incoming messages.
/// Number of incoming message bundles.
pub num_incoming_bundles: u32,
/// Number of operations.
pub num_operations: u32,
Expand Down Expand Up @@ -623,7 +623,7 @@ where
Ok(true)
}

/// Removes the incoming messages in the block from the inboxes.
/// Removes the incoming message bundles in the block from the inboxes.
pub async fn remove_bundles_from_inboxes(&mut self, block: &Block) -> Result<(), ChainError> {
let chain_id = self.chain_id();
let mut bundles_by_origin: BTreeMap<_, Vec<&MessageBundle>> = Default::default();
Expand Down Expand Up @@ -691,7 +691,7 @@ where
/// * Modifies the state of outboxes and channels, if needed.
/// * As usual, in case of errors, `self` may not be consistent any more and should be thrown
/// away.
/// * Returns the list of messages caused by the block being executed.
/// * Returns the outcome of the execution.
pub async fn execute_block(
&mut self,
block: &Block,
Expand Down
2 changes: 1 addition & 1 deletion linera-client/src/client_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ where
let client = Client::new(
node_provider,
storage,
options.max_pending_messages,
options.max_pending_message_bundles,
delivery,
wallet.chain_ids(),
"Client node",
Expand Down
3 changes: 2 additions & 1 deletion linera-client/src/client_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ pub struct ClientOptions {
#[arg(long = "recv-timeout-ms", default_value = "4000", value_parser = util::parse_millis)]
pub recv_timeout: Duration,

/// The maximum number of incoming message bundles to include in a block proposal.
#[arg(long, default_value = "10")]
pub max_pending_messages: usize,
pub max_pending_message_bundles: usize,

/// The WebAssembly runtime to use.
#[arg(long)]
Expand Down
4 changes: 2 additions & 2 deletions linera-core/src/chain_worker/state/temporary_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ where
}
);
}
if query.request_pending_messages {
if query.request_pending_message_bundles {
let mut messages = Vec::new();
let pairs = chain.inboxes.try_load_all_entries().await?;
let action = if *chain.execution_state.system.closed.get() {
Expand Down Expand Up @@ -315,7 +315,7 @@ where
}
}

info.requested_pending_messages = messages;
info.requested_pending_message_bundles = messages;
}
if let Some(range) = query.request_sent_certificate_hashes_in_range {
let start: usize = range.start.try_into()?;
Expand Down
88 changes: 46 additions & 42 deletions linera-core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ where
/// Local node to manage the execution state and the local storage of the chains that we are
/// tracking.
local_node: LocalNodeClient<Storage>,
/// Maximum number of pending messages processed at a time in a block.
max_pending_messages: usize,
/// Maximum number of pending message bundles processed at a time in a block.
max_pending_message_bundles: usize,
/// The policy for automatically handling incoming messages.
message_policy: MessagePolicy,
/// Whether to block on cross-chain message delivery.
Expand All @@ -111,7 +111,7 @@ impl<P, S: Storage + Clone> Client<P, S> {
pub fn new(
validator_node_provider: P,
storage: S,
max_pending_messages: usize,
max_pending_message_bundles: usize,
cross_chain_message_delivery: CrossChainMessageDelivery,
tracked_chains: impl IntoIterator<Item = ChainId>,
name: impl Into<String>,
Expand All @@ -131,7 +131,7 @@ impl<P, S: Storage + Clone> Client<P, S> {
validator_node_provider,
local_node,
chains: DashMap::new(),
max_pending_messages,
max_pending_message_bundles,
message_policy: MessagePolicy::new(BlanketMessagePolicy::Accept, None),
cross_chain_message_delivery,
tracked_chains,
Expand Down Expand Up @@ -199,7 +199,7 @@ impl<P, S: Storage + Clone> Client<P, S> {
client: self.clone(),
chain_id,
options: ChainClientOptions {
max_pending_messages: self.max_pending_messages,
max_pending_message_bundles: self.max_pending_message_bundles,
message_policy: self.message_policy.clone(),
cross_chain_message_delivery: self.cross_chain_message_delivery,
},
Expand Down Expand Up @@ -297,8 +297,8 @@ pub struct ChainState {
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct ChainClientOptions {
/// Maximum number of pending messages processed at a time in a block.
pub max_pending_messages: usize,
/// Maximum number of pending message bundles processed at a time in a block.
pub max_pending_message_bundles: usize,
/// The policy for automatically handling incoming messages.
pub message_policy: MessagePolicy,
/// Whether to block on cross-chain message delivery.
Expand Down Expand Up @@ -585,17 +585,15 @@ where
}

#[tracing::instrument(level = "trace")]
/// Obtains up to `self.options.max_pending_messages` pending messages for the local chain.
///
/// Messages known to be redundant are filtered out: A `RegisterApplications` message whose
/// entries are already known never needs to be included in a block.
async fn pending_messages(&self) -> Result<Vec<IncomingBundle>, ChainClientError> {
/// Obtains up to `self.options.max_pending_message_bundles` pending message bundles for the
/// local chain.
async fn pending_message_bundles(&self) -> Result<Vec<IncomingBundle>, ChainClientError> {
if self.state().next_block_height != BlockHeight::ZERO
&& self.options.message_policy.is_ignore()
{
return Ok(Vec::new()); // OpenChain is already received, other are ignored.
}
let query = ChainInfoQuery::new(self.chain_id).with_pending_messages();
let query = ChainInfoQuery::new(self.chain_id).with_pending_message_bundles();
let info = self
.client
.local_node
Expand All @@ -606,8 +604,8 @@ where
info.next_block_height == self.state().next_block_height,
ChainClientError::WalletSynchronizationError
);
let mut requested_pending_messages = info.requested_pending_messages;
let mut pending_messages = vec![];
let mut requested_pending_message_bundles = info.requested_pending_message_bundles;
let mut pending_message_bundles = vec![];
// The first incoming message of any child chain must be `OpenChain`. We must have it in
// our inbox, and include it before all other messages.
if info.next_block_height == BlockHeight::ZERO
Expand All @@ -616,37 +614,40 @@ where
.ok_or_else(|| LocalNodeError::InactiveChain(self.chain_id))?
.is_child()
{
let Some(index) = requested_pending_messages.iter().position(|message| {
matches!(
message.bundle.messages.first(),
Some(PostedMessage {
message: Message::System(SystemMessage::OpenChain(_)),
..
})
)
}) else {
let Some(index) = requested_pending_message_bundles
.iter()
.position(|message| {
matches!(
message.bundle.messages.first(),
Some(PostedMessage {
message: Message::System(SystemMessage::OpenChain(_)),
..
})
)
})
else {
return Err(LocalNodeError::InactiveChain(self.chain_id).into());
};
let open_chain_message = requested_pending_messages.remove(index);
pending_messages.push(open_chain_message);
let open_chain_bundle = requested_pending_message_bundles.remove(index);
pending_message_bundles.push(open_chain_bundle);
}
if self.options.message_policy.is_ignore() {
return Ok(pending_messages); // Ignore messages other than OpenChain.
return Ok(pending_message_bundles); // Ignore messages other than OpenChain.
}
for mut message in requested_pending_messages {
if pending_messages.len() >= self.options.max_pending_messages {
for mut bundle in requested_pending_message_bundles {
if pending_message_bundles.len() >= self.options.max_pending_message_bundles {
warn!(
"Limiting block to {} incoming messages",
self.options.max_pending_messages
"Limiting block to {} incoming message bundles",
self.options.max_pending_message_bundles
);
break;
}
if !self.options.message_policy.handle(&mut message) {
if !self.options.message_policy.handle(&mut bundle) {
continue;
}
pending_messages.push(message);
pending_message_bundles.push(bundle);
}
Ok(pending_messages)
Ok(pending_message_bundles)
}

#[tracing::instrument(level = "trace")]
Expand Down Expand Up @@ -1907,7 +1908,7 @@ where
return Ok(ExecuteBlockOutcome::WaitForTimeout(timeout))
}
}
let incoming_bundles = self.pending_messages().await?;
let incoming_bundles = self.pending_message_bundles().await?;
let confirmed_value = self.set_pending_block(incoming_bundles, operations).await?;
match self.process_pending_block_without_prepare().await? {
ClientOutcome::Committed(Some(certificate))
Expand Down Expand Up @@ -2035,7 +2036,7 @@ where
/// incoming messages in a new block.
///
/// Does not attempt to synchronize with validators. The result will reflect up to
/// `max_pending_messages` incoming messages and the execution fees for a single
/// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
/// block.
pub async fn query_balance(&self) -> Result<Amount, ChainClientError> {
let (balance, _) = self.query_balances_with_owner(None).await?;
Expand All @@ -2047,7 +2048,7 @@ where
/// incoming messages in a new block.
///
/// Does not attempt to synchronize with validators. The result will reflect up to
/// `max_pending_messages` incoming messages and the execution fees for a single
/// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
/// block.
pub async fn query_owner_balance(&self, owner: Owner) -> Result<Amount, ChainClientError> {
Ok(self
Expand All @@ -2062,13 +2063,13 @@ where
/// staging the execution of incoming messages in a new block.
///
/// Does not attempt to synchronize with validators. The result will reflect up to
/// `max_pending_messages` incoming messages and the execution fees for a single
/// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
/// block.
async fn query_balances_with_owner(
&self,
owner: Option<Owner>,
) -> Result<(Amount, Option<Amount>), ChainClientError> {
let incoming_bundles = self.pending_messages().await?;
let incoming_bundles = self.pending_message_bundles().await?;
let timestamp = self.next_timestamp(&incoming_bundles).await;
let block = Block {
epoch: self.epoch().await?,
Expand Down Expand Up @@ -2689,7 +2690,8 @@ where
}

#[tracing::instrument(level = "trace")]
/// Creates an empty block to process all incoming messages. This may require several blocks.
/// Creates blocks without any operations to process all incoming messages. This may require
/// several blocks.
///
/// If not all certificates could be processed due to a timeout, the timestamp for when to retry
/// is returned, too.
Expand All @@ -2699,7 +2701,7 @@ where
self.prepare_chain().await?;
let mut certificates = Vec::new();
loop {
let incoming_bundles = self.pending_messages().await?;
let incoming_bundles = self.pending_message_bundles().await?;
if incoming_bundles.is_empty() {
return Ok((certificates, None));
}
Expand All @@ -2715,7 +2717,9 @@ where
}

#[tracing::instrument(level = "trace")]
/// Creates an empty block to process all incoming messages. This may require several blocks.
/// Creates blocks without any operations to process all incoming messages. This may require
/// several blocks.
///
/// If we are not a chain owner, this doesn't fail, and just returns an empty list.
pub async fn process_inbox_if_owned(
&self,
Expand Down
12 changes: 6 additions & 6 deletions linera-core/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub struct ChainInfoQuery {
/// Query the current committees.
pub request_committees: bool,
/// Query the received messages that are waiting be picked in the next block.
pub request_pending_messages: bool,
pub request_pending_message_bundles: bool,
/// Query a range of certificate hashes sent from the chain.
pub request_sent_certificate_hashes_in_range: Option<BlockHeightRange>,
/// Query new certificate sender chain IDs and block heights received from the chain.
Expand All @@ -75,7 +75,7 @@ impl ChainInfoQuery {
test_next_block_height: None,
request_committees: false,
request_owner_balance: None,
request_pending_messages: false,
request_pending_message_bundles: false,
request_sent_certificate_hashes_in_range: None,
request_received_log_excluding_first_nth: None,
request_manager_values: false,
Expand All @@ -99,8 +99,8 @@ impl ChainInfoQuery {
self
}

pub fn with_pending_messages(mut self) -> Self {
self.request_pending_messages = true;
pub fn with_pending_message_bundles(mut self) -> Self {
self.request_pending_message_bundles = true;
self
}

Expand Down Expand Up @@ -156,7 +156,7 @@ pub struct ChainInfo {
/// The current committees.
pub requested_committees: Option<BTreeMap<Epoch, Committee>>,
/// The received messages that are waiting be picked in the next block (if requested).
pub requested_pending_messages: Vec<IncomingBundle>,
pub requested_pending_message_bundles: Vec<IncomingBundle>,
/// The response to `request_sent_certificate_hashes_in_range`
pub requested_sent_certificate_hashes: Vec<CryptoHash>,
/// The current number of received certificates (useful for `request_received_log_excluding_first_nth`)
Expand Down Expand Up @@ -236,7 +236,7 @@ where
state_hash: *view.execution_state_hash.get(),
requested_committees: None,
requested_owner_balance: None,
requested_pending_messages: Vec::new(),
requested_pending_message_bundles: Vec::new(),
requested_sent_certificate_hashes: Vec::new(),
count_received_log: view.received_log.count(),
requested_received_log: Vec::new(),
Expand Down
2 changes: 1 addition & 1 deletion linera-rpc/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ message ChainInfoQuery {
bool request_committees = 3;

// Query the received messages that are waiting be picked in the next block.
bool request_pending_messages = 4;
bool request_pending_message_bundles = 4;

// Query a range of certificates hashes sent from the chain.
optional bytes request_sent_certificate_hashes_in_range = 5;
Expand Down
8 changes: 4 additions & 4 deletions linera-rpc/src/grpc/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ impl TryFrom<api::ChainInfoQuery> for ChainInfoQuery {
.request_owner_balance
.map(TryInto::try_into)
.transpose()?,
request_pending_messages: chain_info_query.request_pending_messages,
request_pending_message_bundles: chain_info_query.request_pending_message_bundles,
chain_id: try_proto_convert(chain_info_query.chain_id)?,
request_sent_certificate_hashes_in_range,
request_received_log_excluding_first_nth: chain_info_query
Expand All @@ -403,7 +403,7 @@ impl TryFrom<ChainInfoQuery> for api::ChainInfoQuery {
chain_id: Some(chain_info_query.chain_id.into()),
request_committees: chain_info_query.request_committees,
request_owner_balance: chain_info_query.request_owner_balance.map(Into::into),
request_pending_messages: chain_info_query.request_pending_messages,
request_pending_message_bundles: chain_info_query.request_pending_message_bundles,
test_next_block_height: chain_info_query.test_next_block_height.map(Into::into),
request_sent_certificate_hashes_in_range,
request_received_log_excluding_first_nth: chain_info_query
Expand Down Expand Up @@ -698,7 +698,7 @@ pub mod tests {
state_hash: None,
requested_committees: None,
requested_owner_balance: None,
requested_pending_messages: vec![],
requested_pending_message_bundles: vec![],
requested_sent_certificate_hashes: vec![],
count_received_log: 0,
requested_received_log: vec![],
Expand Down Expand Up @@ -729,7 +729,7 @@ pub mod tests {
test_next_block_height: Some(BlockHeight::from(10)),
request_committees: false,
request_owner_balance: None,
request_pending_messages: false,
request_pending_message_bundles: false,
request_sent_certificate_hashes_in_range: Some(
linera_core::data_types::BlockHeightRange {
start: BlockHeight::from(3),
Expand Down
4 changes: 2 additions & 2 deletions linera-rpc/tests/snapshots/format__format.yaml.snap
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ ChainInfo:
TYPENAME: Epoch
VALUE:
TYPENAME: Committee
- requested_pending_messages:
- requested_pending_message_bundles:
SEQ:
TYPENAME: IncomingBundle
- requested_sent_certificate_hashes:
Expand All @@ -237,7 +237,7 @@ ChainInfoQuery:
OPTION:
TYPENAME: Owner
- request_committees: BOOL
- request_pending_messages: BOOL
- request_pending_message_bundles: BOOL
- request_sent_certificate_hashes_in_range:
OPTION:
TYPENAME: BlockHeightRange
Expand Down
Loading

0 comments on commit 69354e8

Please sign in to comment.