Skip to content

Commit

Permalink
Execute messages from the same transaction to the same destination at…
Browse files Browse the repository at this point in the history
…omically. (#2343)

* Atomic message bundles

* Add a TransactionTracker; have one oracle responses list per txn.

* Deduplicate bundle and operation handling.

* Extract message execution from execute_block.

* Move next_message_index to TransactionTracker.

* Add to_posted test helper function.

* Fix message_id_for_operation.

* Return iterator from make_message_bundles_for.

* Address review comments.

* Address review comments; don't track messages twice.
  • Loading branch information
afck authored Aug 14, 2024
1 parent bdba50e commit 449d5a0
Show file tree
Hide file tree
Showing 41 changed files with 1,755 additions and 1,698 deletions.
504 changes: 231 additions & 273 deletions linera-chain/src/chain.rs

Large diffs are not rendered by default.

259 changes: 180 additions & 79 deletions linera-chain/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use linera_base::{
};
use linera_execution::{
committee::{Committee, Epoch, ValidatorName},
BytecodeLocation, Message, MessageKind, Operation, SystemOperation,
BytecodeLocation, Message, MessageKind, Operation, SystemMessage, SystemOperation,
};
use serde::{de::Deserializer, Deserialize, Serialize};

Expand Down Expand Up @@ -63,9 +63,12 @@ impl Block {
/// Returns all bytecode locations referred to in this block's incoming messages.
pub fn bytecode_locations(&self) -> HashSet<BytecodeLocation> {
let mut locations = HashSet::new();
for message in &self.incoming_bundles {
if let Message::System(sys_message) = &message.event.message {
locations.extend(sys_message.bytecode_locations(message.event.certificate_hash));
for incoming_bundle in &self.incoming_bundles {
let bundle = &incoming_bundle.bundle;
for posted_message in &bundle.messages {
if let Message::System(sys_message) = &posted_message.message {
locations.extend(sys_message.bytecode_locations(bundle.certificate_hash));
}
}
}
locations
Expand All @@ -92,6 +95,51 @@ impl Block {
.iter()
.all(|message| message.action == MessageAction::Reject)
}

/// Returns whether `OpenChain` is the first message in this block.
pub fn starts_with_open_chain(&self) -> bool {
let Some(posted_message) = self.incoming_messages().next() else {
return false;
};
matches!(
posted_message.message,
Message::System(SystemMessage::OpenChain(_))
)
}

/// Returns an iterator over all incoming [`PostedMessage`]s in this block.
pub fn incoming_messages(&self) -> impl Iterator<Item = &PostedMessage> {
self.incoming_bundles
.iter()
.flat_map(|incoming_bundle| &incoming_bundle.bundle.messages)
}

/// Returns the number of incoming messages.
pub fn message_count(&self) -> usize {
self.incoming_bundles
.iter()
.map(|im| im.bundle.messages.len())
.sum()
}

/// Returns an iterator over all transactions, by index.
pub fn transactions(&self) -> impl Iterator<Item = (u32, Transaction<'_>)> {
let bundles = self
.incoming_bundles
.iter()
.map(Transaction::ReceiveMessages);
let operations = self.operations.iter().map(Transaction::ExecuteOperation);
(0u32..).zip(bundles.chain(operations))
}
}

/// A transaction in a block: incoming messages or an operation.
#[derive(Debug, Clone)]
pub enum Transaction<'a> {
/// Receive a bundle of incoming messages.
ReceiveMessages(&'a IncomingBundle),
/// Execute an operation.
ExecuteOperation(&'a Operation),
}

/// A chain ID with a block height.
Expand All @@ -101,18 +149,43 @@ pub struct ChainAndHeight {
pub height: BlockHeight,
}

/// A message received from a block of another chain.
impl ChainAndHeight {
/// Returns the ID of the `index`-th message sent by the block at that height.
pub fn to_message_id(&self, index: u32) -> MessageId {
MessageId {
chain_id: self.chain_id,
height: self.height,
index,
}
}
}

/// A bundle of cross-chain messages.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, SimpleObject)]
pub struct IncomingBundle {
/// The origin of the message (chain and channel if any).
/// The origin of the messages (chain and channel if any).
pub origin: Origin,
/// The content of the message to be delivered to the inbox identified by
/// `origin`.
pub event: Event,
/// The messages to be delivered to the inbox identified by `origin`.
pub bundle: MessageBundle,
/// What to do with the message.
pub action: MessageAction,
}

impl IncomingBundle {
/// Returns an iterator over all posted messages in this bundle, together with their ID.
pub fn messages_and_ids(&self) -> impl Iterator<Item = (MessageId, &PostedMessage)> {
let chain_and_height = ChainAndHeight {
chain_id: self.origin.sender,
height: self.bundle.height,
};
let messages = self.bundle.messages.iter();
messages.map(move |posted_message| {
let message_id = chain_and_height.to_message_id(posted_message.index);
(message_id, posted_message)
})
}
}

/// What to do with a message picked from the inbox.
#[derive(Copy, Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub enum MessageAction {
Expand All @@ -122,41 +195,6 @@ pub enum MessageAction {
Reject,
}

impl IncomingBundle {
/// Returns the ID identifying this message.
pub fn id(&self) -> MessageId {
MessageId {
chain_id: self.origin.sender,
height: self.event.height,
index: self.event.index,
}
}
}

/// A message together with non replayable information to ensure uniqueness in a
/// particular inbox.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
pub struct Event {
/// The hash of the certificate that created the event.
pub certificate_hash: CryptoHash,
/// The height of the block that created the event.
pub height: BlockHeight,
/// The index of the message.
pub index: u32,
/// The authenticated signer for the operation that created the event, if any.
pub authenticated_signer: Option<Owner>,
/// A grant to pay for the message execution.
pub grant: Amount,
/// Where to send a refund for the unused part of the grant after execution, if any.
pub refund_grant_to: Option<Account>,
/// The kind of event being delivered.
pub kind: MessageKind,
/// The timestamp of the block that caused the message.
pub timestamp: Timestamp,
/// The message of the event (i.e. the actual payload of a message).
pub message: Message,
}

/// The origin of a message, relative to a particular application. Used to identify each inbox.
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Serialize, Deserialize)]
pub struct Origin {
Expand All @@ -176,19 +214,18 @@ pub struct Target {
}

/// A set of messages from a single block, for a single destination.
#[derive(Clone, Debug, Serialize, Deserialize)]
#[cfg_attr(with_testing, derive(Eq, PartialEq))]
#[derive(Debug, Eq, PartialEq, Clone, Hash, Serialize, Deserialize)]
pub struct MessageBundle {
/// The block height.
pub height: BlockHeight,
/// The block's epoch.
pub epoch: Epoch,
/// The block's timestamp.
pub timestamp: Timestamp,
/// The confirmed block certificate hash.
pub hash: CryptoHash,
/// The relevant messages, with their index.
pub messages: Vec<(u32, OutgoingMessage)>,
pub certificate_hash: CryptoHash,
/// The index of the transaction in the block that is sending this bundle.
pub transaction_index: u32,
/// The relevant messages.
pub messages: Vec<PostedMessage>,
}

#[derive(Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
Expand Down Expand Up @@ -223,7 +260,7 @@ pub struct BlockProposal {
pub validated_block_certificate: Option<LiteCertificate<'static>>,
}

/// A message together with routing information.
/// A posted message together with routing information.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, SimpleObject)]
pub struct OutgoingMessage {
/// The destination of the message.
Expand All @@ -234,8 +271,25 @@ pub struct OutgoingMessage {
pub grant: Amount,
/// Where to send a refund for the unused part of the grant after execution, if any.
pub refund_grant_to: Option<Account>,
/// The kind of event being sent.
/// The kind of message being sent.
pub kind: MessageKind,
/// The message itself.
pub message: Message,
}

/// A message together with kind, authentication and grant information.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, SimpleObject)]
pub struct PostedMessage {
/// The user authentication carried by the message, if any.
pub authenticated_signer: Option<Owner>,
/// A grant to pay for the message execution.
pub grant: Amount,
/// Where to send a refund for the unused part of the grant after execution, if any.
pub refund_grant_to: Option<Account>,
/// The kind of message being sent.
pub kind: MessageKind,
/// The index of the message in the sending block.
pub index: u32,
/// The message itself.
pub message: Message,
}
Expand All @@ -258,6 +312,26 @@ impl OutgoingMessage {
) => *application_id == self.message.application_id() && name == dest_name,
}
}

/// Returns the posted message, i.e. the outgoing message without the destination.
pub fn into_posted(self, index: u32) -> PostedMessage {
let OutgoingMessage {
destination: _,
authenticated_signer,
grant,
refund_grant_to,
kind,
message,
} = self;
PostedMessage {
authenticated_signer,
grant,
refund_grant_to,
kind,
index,
message,
}
}
}

/// A [`Block`], together with the outcome from its execution.
Expand Down Expand Up @@ -678,7 +752,29 @@ impl CertificateValue {
}
}

impl Event {
impl MessageBundle {
pub fn is_skippable(&self) -> bool {
self.messages.iter().all(PostedMessage::is_skippable)
}

pub fn is_tracked(&self) -> bool {
let mut tracked = false;
for posted_message in &self.messages {
match posted_message.kind {
MessageKind::Simple | MessageKind::Bouncing => {}
MessageKind::Protected => return false,
MessageKind::Tracked => tracked = true,
}
}
tracked
}

pub fn is_protected(&self) -> bool {
self.messages.iter().any(PostedMessage::is_protected)
}
}

impl PostedMessage {
pub fn is_skippable(&self) -> bool {
use MessageKind::*;
match self.kind {
Expand Down Expand Up @@ -1109,30 +1205,35 @@ impl Certificate {
/// recipient. Messages originating from different transactions of the original block
/// are kept in separate bundles. If the medium is a channel, does not verify that the
/// recipient is actually subscribed to that channel.
pub fn message_bundles_for(&self, medium: &Medium, recipient: ChainId) -> Vec<MessageBundle> {
let Some(executed_block) = self.value().executed_block() else {
return Vec::new();
};
let mut bundles = Vec::new();
pub fn message_bundles_for<'a>(
&'a self,
medium: &'a Medium,
recipient: ChainId,
) -> impl Iterator<Item = (Epoch, MessageBundle)> + 'a {
let mut index = 0u32;
for block_messages in executed_block.messages().iter() {
let messages = (index..)
.zip(block_messages)
.filter(|(_, message)| message.has_destination(medium, recipient))
.map(|(idx, message)| (idx, message.clone()))
.collect::<Vec<_>>();
index += block_messages.len() as u32;
if !messages.is_empty() {
bundles.push(MessageBundle {
height: executed_block.block.height,
epoch: executed_block.block.epoch,
timestamp: executed_block.block.timestamp,
hash: self.hash(),
messages,
});
}
}
bundles
let maybe_executed_block = self.value().executed_block().into_iter();
maybe_executed_block.flat_map(move |executed_block| {
(0u32..).zip(executed_block.messages()).filter_map(
move |(transaction_index, txn_messages)| {
let messages = (index..)
.zip(txn_messages)
.filter(|(_, message)| message.has_destination(medium, recipient))
.map(|(idx, message)| message.clone().into_posted(idx))
.collect::<Vec<_>>();
index += txn_messages.len() as u32;
(!messages.is_empty()).then(|| {
let bundle = MessageBundle {
height: executed_block.block.height,
timestamp: executed_block.block.timestamp,
certificate_hash: self.hash(),
transaction_index,
messages,
};
(executed_block.block.epoch, bundle)
})
},
)
})
}

pub fn requires_blob(&self, blob_id: &BlobId) -> bool {
Expand Down Expand Up @@ -1197,8 +1298,8 @@ doc_scalar!(
"A channel name together with its application ID"
);
doc_scalar!(
Event,
"A message together with non replayable information to ensure uniqueness in a particular inbox"
MessageBundle,
"A set of messages from a single block, for a single destination."
);
doc_scalar!(
Medium,
Expand Down
Loading

0 comments on commit 449d5a0

Please sign in to comment.