Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lock mutex in more client methods. #2567

Merged
merged 2 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions linera-core/src/client/chain_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ pub struct ChainState {
/// been processed by (i.e. been proposed to) our own local chain manager yet.
pending_blobs: BTreeMap<BlobId, Blob>,

/// A mutex that is held whilst we are preparing the next block, to ensure that no
/// other client can begin preparing a block.
preparing_block: Arc<Mutex<()>>,
/// A mutex that is held whilst we are performing operations that should not be
/// attempted by multiple clients at the same time.
client_mutex: Arc<Mutex<()>>,
}

impl ChainState {
Expand All @@ -72,7 +72,7 @@ impl ChainState {
pending_block: None,
pending_blobs,
received_certificate_trackers: HashMap::new(),
preparing_block: Arc::default(),
client_mutex: Arc::default(),
};
if let Some(block) = pending_block {
state.set_pending_block(block);
Expand Down Expand Up @@ -100,7 +100,7 @@ impl ChainState {
&self.pending_block
}

pub fn set_pending_block(&mut self, block: Block) {
pub(super) fn set_pending_block(&mut self, block: Block) {
if block.height == self.next_block_height {
self.pending_block = Some(block);
} else {
Expand All @@ -116,15 +116,15 @@ impl ChainState {
&self.pending_blobs
}

pub fn insert_pending_blob(&mut self, blob: Blob) {
pub(super) fn insert_pending_blob(&mut self, blob: Blob) {
self.pending_blobs.insert(blob.id(), blob);
}

pub fn known_key_pairs(&self) -> &BTreeMap<Owner, KeyPair> {
&self.known_key_pairs
}

pub fn insert_known_key_pair(&mut self, key_pair: KeyPair) -> PublicKey {
pub(super) fn insert_known_key_pair(&mut self, key_pair: KeyPair) -> PublicKey {
let new_public_key = key_pair.public();
self.known_key_pairs.insert(new_public_key.into(), key_pair);
new_public_key
Expand All @@ -134,7 +134,11 @@ impl ChainState {
&self.received_certificate_trackers
}

pub fn update_received_certificate_tracker(&mut self, name: ValidatorName, tracker: u64) {
pub(super) fn update_received_certificate_tracker(
&mut self,
name: ValidatorName,
tracker: u64,
) {
self.received_certificate_trackers
.entry(name)
.and_modify(|t| {
Expand All @@ -147,7 +151,7 @@ impl ChainState {
.or_insert(tracker);
}

pub fn update_from_info(&mut self, info: &ChainInfo) {
pub(super) fn update_from_info(&mut self, info: &ChainInfo) {
if info.next_block_height > self.next_block_height {
self.next_block_height = info.next_block_height;
self.clear_pending_block();
Expand All @@ -156,12 +160,12 @@ impl ChainState {
}
}

pub fn clear_pending_block(&mut self) {
pub(super) fn clear_pending_block(&mut self) {
self.pending_block = None;
self.pending_blobs.clear();
}

pub fn preparing_block(&self) -> Arc<Mutex<()>> {
self.preparing_block.clone()
pub(super) fn client_mutex(&self) -> Arc<Mutex<()>> {
self.client_mutex.clone()
}
}
12 changes: 9 additions & 3 deletions linera-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,9 @@ where
#[cfg(with_metrics)]
let _latency = metrics::PREPARE_CHAIN_LATENCY.measure_latency();

let mutex = self.state().client_mutex();
let _guard = mutex.lock_owned().await;
Comment on lines +874 to +875
Copy link
Contributor

@ma2bd ma2bd Oct 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need two separate lines when lock_owned is used? (and otherwise why lock_owned?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually do! self.state() returns a ChainGuard that can't be held across an await point… specifically so we wouldn't ever lock an entry in the chain states map locked.

And it looks like Rust would drop that guard only after the whole expression, i.e. after the await.


// Verify that our local storage contains enough history compared to the
// expected block height. Otherwise, download the missing history from the
// network.
Expand Down Expand Up @@ -1845,8 +1848,8 @@ where
#[cfg(with_metrics)]
let _latency = metrics::EXECUTE_BLOCK_LATENCY.measure_latency();

let block_mutex = self.state().preparing_block();
let _block_guard = block_mutex.lock_owned().await;
let mutex = self.state().client_mutex();
let _guard = mutex.lock_owned().await;
match self.process_pending_block_without_prepare().await? {
ClientOutcome::Committed(Some(certificate)) => {
return Ok(ExecuteBlockOutcome::Conflict(certificate))
Expand Down Expand Up @@ -3001,10 +3004,13 @@ where
/// This is similar to `find_received_certificates` but for only one validator.
/// We also don't try to synchronize the admin chain.
#[tracing::instrument(level = "trace")]
pub async fn find_received_certificates_from_validator(
async fn find_received_certificates_from_validator(
&self,
remote_node: RemoteNode<P::Node>,
) -> Result<(), ChainClientError> {
let mutex = self.state().client_mutex();
let _guard = mutex.lock_owned().await;

let chain_id = self.chain_id;
// Proceed to downloading received certificates.
let (name, tracker, certificates) = self
Expand Down
Loading