Skip to content

Commit

Permalink
mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
mraszyk committed Oct 24, 2024
1 parent a986554 commit 20a51ba
Showing 1 changed file with 14 additions and 14 deletions.
28 changes: 14 additions & 14 deletions rs/pocket_ic_server/src/state_api/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl std::fmt::Debug for Instance {
/// The state of the PocketIC API.
pub struct ApiState {
// impl note: If locks are acquired on both fields, acquire first on `instances` and then on `graph`.
instances: Arc<RwLock<Vec<Mutex<Instance>>>>,
instances: Arc<Mutex<Vec<Mutex<Instance>>>>,
graph: Arc<RwLock<HashMap<StateLabel, Computations>>>,
sync_wait_time: Duration,
// PocketIC server port
Expand Down Expand Up @@ -201,7 +201,7 @@ impl PocketIcApiStateBuilder {
})
})
.collect();
let instances = Arc::new(RwLock::new(instances));
let instances = Arc::new(Mutex::new(instances));

let sync_wait_time = self.sync_wait_time.unwrap_or(DEFAULT_SYNC_WAIT_DURATION);

Expand Down Expand Up @@ -658,7 +658,7 @@ impl ApiState {
// Executes an operation to completion and returns its `OpOut`
// or `None` if the auto progress mode received a stop signal.
async fn execute_operation(
instances: Arc<RwLock<Vec<Mutex<Instance>>>>,
instances: Arc<Mutex<Vec<Mutex<Instance>>>>,
graph: Arc<RwLock<HashMap<StateLabel, Computations>>>,
instance_id: InstanceId,
op: impl Operation + Send + Sync + 'static,
Expand Down Expand Up @@ -729,7 +729,7 @@ impl ApiState {
where
F: FnOnce(InstanceId) -> PocketIc + std::marker::Send + 'static,
{
let mut instances = self.instances.write().await;
let mut instances = self.instances.lock().await;
let instance_id = instances.len();
let instance = tokio::task::spawn_blocking(move || f(instance_id))
.await
Expand All @@ -745,7 +745,7 @@ impl ApiState {
pub async fn delete_instance(&self, instance_id: InstanceId) {
self.stop_progress(instance_id).await;
loop {
let instances = self.instances.read().await;
let instances = self.instances.lock().await;
trace!("grabbed instances in delete_instance; instance_id={}", instance_id);
let mut instance = instances[instance_id].lock().await;
match &instance.state {
Expand All @@ -769,7 +769,7 @@ impl ApiState {

pub async fn delete_all_instances(arc_self: Arc<ApiState>) {
let mut tasks = JoinSet::new();
let instances = arc_self.instances.read().await;
let instances = arc_self.instances.lock().await;
trace!("delete_all_instances: grabbed instances");
let num_instances = instances.len();
drop(instances);
Expand Down Expand Up @@ -1151,7 +1151,7 @@ impl ApiState {
}

async fn process_canister_http_requests(
instances: Arc<RwLock<Vec<Mutex<Instance>>>>,
instances: Arc<Mutex<Vec<Mutex<Instance>>>>,
graph: Arc<RwLock<HashMap<StateLabel, Computations>>>,
instance_id: InstanceId,
rx: &mut Receiver<()>,
Expand All @@ -1174,7 +1174,7 @@ impl ApiState {
let subnet_id = canister_http_request.subnet_id;
let request_id = canister_http_request.request_id;
let response = loop {
let instances = instances.read().await;
let instances = instances.lock().await;
trace!("process_canister_http_requests: grabbed instances; instance_id={}", instance_id);
let instance = instances[instance_id].lock().await;
if let InstanceState::Available(pocket_ic) = &instance.state {
Expand Down Expand Up @@ -1235,7 +1235,7 @@ impl ApiState {
let artificial_delay = Duration::from_millis(artificial_delay_ms.unwrap_or_default());
let instances_clone = self.instances.clone();
let graph = self.graph.clone();
let instances = self.instances.read().await;
let instances = self.instances.lock().await;
trace!("auto_progress: grabbed instances; instance_id={}", instance_id);
let mut instance = instances[instance_id].lock().await;
if instance.progress_thread.is_none() {
Expand Down Expand Up @@ -1290,7 +1290,7 @@ impl ApiState {
}

pub async fn stop_progress(&self, instance_id: InstanceId) {
let instances = self.instances.read().await;
let instances = self.instances.lock().await;
trace!("stop_progress: grabbed instances; instance_id={}", instance_id);
let mut instance = instances[instance_id].lock().await;
let progress_thread = instance.progress_thread.take();
Expand All @@ -1306,7 +1306,7 @@ impl ApiState {

pub async fn list_instance_states(&self) -> Vec<String> {
panic!("");
let instances = self.instances.read().await;
let instances = self.instances.lock().await;
let mut res = vec![];

println!("total instances: {}", instances.len());
Expand Down Expand Up @@ -1383,7 +1383,7 @@ impl ApiState {
/// Same as [Self::update] except that the timeout can be specified manually. This is useful in
/// cases when clients want to enforce a long-running blocking call.
async fn update_instances_with_timeout<O>(
instances: Arc<RwLock<Vec<Mutex<Instance>>>>,
instances: Arc<Mutex<Vec<Mutex<Instance>>>>,
graph: Arc<RwLock<HashMap<StateLabel, Computations>>>,
op: Arc<O>,
instance_id: InstanceId,
Expand All @@ -1399,7 +1399,7 @@ impl ApiState {
op_id,
);
let instances_cloned = instances.clone();
let instances_locked = instances_cloned.read().await;
let instances_locked = instances_cloned.lock().await;
trace!("update_instances_with_timeout: grabbed instances; instance_id={}", instance_id);
let (bg_task, busy_outcome) = if let Some(instance_mutex) =
instances_locked.get(instance_id)
Expand Down Expand Up @@ -1452,7 +1452,7 @@ impl ApiState {
let new_state_label = pocket_ic.get_state_label();
// add result to graph, but grab instance lock first!
println!("waiting to grab instane: old={:?} op_id={:?}", old_state_label, op_id);
let instances = instances.blocking_read();
let instances = instances.blocking_lock();
trace!("update_instances_with_timeout-thread: grabbed instances; instance_id={}", instance_id);
println!("waiting to grab graph: old={:?} op_id={:?}", old_state_label, op_id);
let mut graph_guard = graph.blocking_write();
Expand Down

0 comments on commit 20a51ba

Please sign in to comment.