Skip to content

Commit

Permalink
fix: flaky parquet cache for real this time
Browse files Browse the repository at this point in the history
This adds a watch channel to check for when prunes have happened on
the parquet cache oracle, so we can notify something, like a test, that
needs to know when a prune has happened.

This should make the cache eviction test in the parquet_cache module
not flake out anymore.
  • Loading branch information
hiltontj committed Oct 3, 2024
1 parent 7d37bbb commit 48cfe2f
Showing 1 changed file with 28 additions and 8 deletions.
36 changes: 28 additions & 8 deletions influxdb3_write/src/parquet_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use object_store::{
use observability_deps::tracing::{error, info, warn};
use tokio::sync::{
mpsc::{channel, Receiver, Sender},
oneshot,
oneshot, watch,
};

/// Shared future type for cache values that are being fetched
Expand Down Expand Up @@ -58,6 +58,11 @@ impl CacheRequest {
pub trait ParquetCacheOracle: Send + Sync + Debug {
/// Register a cache request with the oracle
fn register(&self, cache_request: CacheRequest);

// Get a receiver that is notified when a prune takes place
fn prune_notifier(&self) -> Option<watch::Receiver<usize>> {
None
}
}

/// Concrete implementation of the [`ParquetCacheOracle`]
Expand All @@ -66,6 +71,7 @@ pub trait ParquetCacheOracle: Send + Sync + Debug {
#[derive(Debug, Clone)]
pub struct MemCacheOracle {
cache_request_tx: Sender<CacheRequest>,
prune_notifier_tx: watch::Sender<usize>,
}

// TODO(trevor): make this configurable with reasonable default
Expand All @@ -80,8 +86,12 @@ impl MemCacheOracle {
fn new(mem_cached_store: Arc<MemCachedObjectStore>, prune_interval: Duration) -> Self {
let (cache_request_tx, cache_request_rx) = channel(CACHE_REQUEST_BUFFER_SIZE);
background_cache_request_handler(Arc::clone(&mem_cached_store), cache_request_rx);
background_cache_pruner(mem_cached_store, prune_interval);
Self { cache_request_tx }
let (prune_notifier_tx, _prune_notifier_rx) = watch::channel(0);
background_cache_pruner(mem_cached_store, prune_notifier_tx.clone(), prune_interval);
Self {
cache_request_tx,
prune_notifier_tx,
}
}
}

Expand All @@ -94,6 +104,10 @@ impl ParquetCacheOracle for MemCacheOracle {
};
});
}

fn prune_notifier(&self) -> Option<watch::Receiver<usize>> {
Some(self.prune_notifier_tx.subscribe())
}
}

/// Helper function for creation of a [`MemCachedObjectStore`] and [`MemCacheOracle`]
Expand Down Expand Up @@ -324,10 +338,10 @@ impl Cache {
/// Prune least recently hit entries from the cache
///
/// This is a no-op if the `used` amount on the cache is not >= its `capacity`
fn prune(&self) {
fn prune(&self) -> Option<usize> {
let used = self.used.load(Ordering::SeqCst);
if used < self.capacity {
return;
return None;
}
let n_to_prune = (self.map.len() as f64 * self.prune_percent).floor() as usize;
// use a BinaryHeap to determine the cut-off time, at which, entries that were
Expand Down Expand Up @@ -367,6 +381,8 @@ impl Cache {
}
// update used mem size with freed amount:
self.used.fetch_sub(freed, Ordering::SeqCst);

Some(freed)
}
}

Expand Down Expand Up @@ -663,14 +679,17 @@ fn background_cache_request_handler(
/// A background task for pruning un-needed entries in the cache
fn background_cache_pruner(
mem_store: Arc<MemCachedObjectStore>,
prune_notifier_tx: watch::Sender<usize>,
interval_duration: Duration,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(interval_duration);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
mem_store.cache.prune();
if let Some(freed) = mem_store.cache.prune() {
let _ = prune_notifier_tx.send(freed);
}
}
})
}
Expand Down Expand Up @@ -764,6 +783,7 @@ pub(crate) mod tests {
cache_prune_percent,
cache_prune_interval,
);
let mut prune_notifier = oracle.prune_notifier().unwrap();
// PUT an entry into the store:
let path_1 = Path::from("0.parquet");
let payload_1 = b"Janeway";
Expand Down Expand Up @@ -856,8 +876,8 @@ pub(crate) mod tests {
assert_eq!(1, inner_store.total_read_request_count(&path_2));
assert_eq!(1, inner_store.total_read_request_count(&path_3));

// allow some time for pruning:
tokio::time::sleep(Duration::from_millis(500)).await;
prune_notifier.changed().await.unwrap();
assert_eq!(23, *prune_notifier.borrow_and_update());

// GET paris from the cached store, this will not be served by the cache, because paris was
// evicted by neelix:
Expand Down

0 comments on commit 48cfe2f

Please sign in to comment.