Skip to content

Commit

Permalink
Fix cancellation issues in request cache map
Browse files Browse the repository at this point in the history
  • Loading branch information
filiptibell committed Sep 15, 2023
1 parent c526fa5 commit 8fa04a5
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 52 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

### Fixed

- Fixed hovers sometimes not appearing and needing to re-hover over the same location

## `0.0.3` - September 14th, 2023

### Fixed
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ lto = true

[dependencies]
async-channel = "1.9"
async-lock = "2.8"
bytes = "1.0"
dashmap = "5.5"
futures = "0.3"
Expand Down
78 changes: 26 additions & 52 deletions src/util/requests/cache_map.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
use std::{sync::Arc, time::Duration};

use async_channel::{bounded, Receiver};
use async_lock::Semaphore;
use dashmap::DashMap;
use futures::Future;
use moka::future::Cache;
use smol::Timer;
use tracing::trace;

type CacheMap<T> = Cache<String, T>;

// Map of senders, used to notify any listeners
// that are waiting for a request to finish and
// a cache value to become available to clone
type Receivers<T> = Arc<DashMap<String, Receiver<T>>>;
type Semaphores = Arc<DashMap<String, Semaphore>>;

/**
Generic cache map for web requests.
Expand All @@ -22,7 +17,7 @@ type Receivers<T> = Arc<DashMap<String, Receiver<T>>>;
#[derive(Debug, Clone)]
pub struct RequestCacheMap<T: Clone + Send + Sync + 'static> {
map: CacheMap<T>,
recvs: Receivers<T>,
sems: Semaphores,
}

impl<T: Clone + Send + Sync + 'static> RequestCacheMap<T> {
Expand All @@ -42,7 +37,7 @@ impl<T: Clone + Send + Sync + 'static> RequestCacheMap<T> {
.build();
RequestCacheMap {
map,
recvs: Arc::new(DashMap::new()),
sems: Arc::new(DashMap::new()),
}
}

Expand Down Expand Up @@ -87,51 +82,30 @@ impl<T: Clone + Send + Sync + 'static> RequestCacheMap<T> {
{
let key = key.into();

let recvs = Arc::clone(&self.recvs);
let recv = recvs.get(&key).map(|r| r.clone());

if let Some(recv) = recv {
match recv.recv().await {
Ok(res) => {
// Got cached value, either old or just produced
return res;
}
Err(_) => {
// Existing request was cancelled / dropped, try again
}
}
// Return cached value right away if possible
if let Some(cached) = self.map.get(&key) {
trace!("Cache hit (1): {key}");
return cached.clone();
}

match self.map.get(&key) {
Some(cached) => cached.clone(),
None => {
let (send, recv) = bounded(1);
recvs.insert(key.clone(), recv);

// HACK: Spawn a timeout task that will clear out any
// senders if for some reason this request was cancelled
// We should really do this on future being dropped instead
let sends_key = key.clone();
let sends_timeout = Arc::clone(&recvs);
smol::spawn(async move {
Timer::after(Duration::from_secs(30)).await;
if sends_timeout.remove(&sends_key).is_some() {
trace!("Request was cancelled, cleaning up")
}
})
.detach();

let result = f.await;

self.map.insert(key.clone(), result.clone()).await;

recvs
.remove(&key)
.expect("Cache receiver was removed unexpectedly");
send.try_send(result.clone()).ok();

result
}
// Wait for permission to try to perform the request -
// guarantees at most one requester at a time per key
let sem = self
.sems
.entry(key.clone())
.or_insert_with(|| Semaphore::new(1));
let _guard = sem.acquire().await;

// We have permission, but the cache may have been updated, check again
if let Some(cached) = self.map.get(&key) {
trace!("Cache hit (2): {key}");
return cached.clone();
}

// Not cached, and we have permission, so perform the request
trace!("Performing cached request: {key}");
let result = f.await;
self.map.insert(key.clone(), result.clone()).await;
result
}
}

0 comments on commit 8fa04a5

Please sign in to comment.