Skip to content
This repository has been archived by the owner on Nov 6, 2023. It is now read-only.

Commit

Permalink
Merge pull request #60 from thomastaylor312/fix/y_no_linkz
Browse files Browse the repository at this point in the history
fix(caching): Fixes issue with cached links not populating
  • Loading branch information
thomastaylor312 authored Oct 11, 2023
2 parents 0b81489 + 50950e4 commit 62fb192
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 102 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "wasmcloud-control-interface"
version = "0.29.2"
version = "0.30.0"
authors = ["wasmCloud Team"]
edition = "2021"
homepage = "https://wasmcloud.com"
Expand Down
97 changes: 73 additions & 24 deletions src/kv/cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,27 @@ use futures::StreamExt;
use futures::TryStreamExt;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tracing::{debug, error};
use tracing::{debug, error, trace, Instrument};

use crate::LinkDefinition;
use crate::Result;

use super::{
delete_link, ld_hash, ld_hash_raw, put_link, KvStore, CLAIMS_PREFIX, LINKDEF_PREFIX,
delete_link, ld_hash, ld_hash_raw, put_link, Build, KvStore, CLAIMS_PREFIX, LINKDEF_PREFIX,
SUBJECT_KEY,
};

type ClaimsMap = HashMap<String, HashMap<String, String>>;

#[derive(Clone, Debug)]
/// A KV store that caches all link definitions and claims in memory as it receives updates from the
/// NATS KV bucket. This store is recommended for use in situations where there are many data
/// lookups (an example of this is Wadm).
#[derive(Clone)]
pub struct CachedKvStore {
store: Store,
linkdefs: Arc<RwLock<HashMap<String, LinkDefinition>>>,
claims: Arc<RwLock<ClaimsMap>>,
handle: Arc<JoinHandle<()>>,
}

impl Drop for CachedKvStore {
fn drop(&mut self) {
self.handle.abort();
}
_handle: Arc<WrappedHandle>,
}

impl AsRef<Store> for CachedKvStore {
Expand All @@ -48,6 +45,18 @@ impl Deref for CachedKvStore {
}
}

/// A wrapper around a JoinHandle that will abort the task when dropped. This allows it to be
/// wrapped in an Arc and cloned, but doesn't abort the task until the last arc is dropped
struct WrappedHandle {
handle: JoinHandle<()>,
}

impl Drop for WrappedHandle {
fn drop(&mut self) {
self.handle.abort();
}
}

impl CachedKvStore {
/// Create a new KV store with the given configuration. This function will do an initial fetch
/// of all claims and linkdefs from the store and then start a watcher to keep the cache up to
Expand All @@ -56,15 +65,12 @@ impl CachedKvStore {
let store = super::get_kv_store(nc, lattice_prefix, js_domain).await?;
let linkdefs = Arc::new(RwLock::new(HashMap::new()));
let claims = Arc::new(RwLock::new(ClaimsMap::default()));
let linkdefs_clone = linkdefs.clone();
let claims_clone = claims.clone();
let linkdefs_clone = Arc::clone(&linkdefs);
let claims_clone = Arc::clone(&claims);
let cloned_store = store.clone();
let (tx, rx) = tokio::sync::oneshot::channel::<Result<()>>();
let kvstore = CachedKvStore {
store,
linkdefs,
claims,
handle: Arc::new(tokio::spawn(async move {
let handle = tokio::spawn(
async move {
// We have to create this in here and use the oneshot to return the error because of
// lifetimes
let mut watcher = match cloned_store.watch_all().await {
Expand All @@ -78,6 +84,7 @@ impl CachedKvStore {
return;
}
};
debug!("Getting initial data from store");
// Start with an initial list of the data before consuming events from the watcher.
// This will ensure we have the most up to date data from the watcher (which we
// started before this step) as well as all entries from the store
Expand Down Expand Up @@ -116,13 +123,15 @@ impl CachedKvStore {
return;
}
};
debug!(num_entries = %all_entries.len(), "Finished fetching initial data, adding data to cache");

tx.send(Ok(())).unwrap();

for entry in all_entries {
handle_entry(entry, linkdefs_clone.clone(), claims_clone.clone()).await;
handle_entry(entry, Arc::clone(&linkdefs_clone), Arc::clone(&claims_clone)).await;
}

trace!("Beginning watch on store");
while let Some(event) = watcher.next().await {
let entry = match event {
Ok(en) => en,
Expand All @@ -131,18 +140,34 @@ impl CachedKvStore {
continue;
}
};
handle_entry(entry, linkdefs_clone.clone(), claims_clone.clone()).await;
trace!(key = %entry.key, bucket = %entry.bucket, operation = ?entry.operation, "Received entry from watcher, handling");
handle_entry(entry, Arc::clone(&linkdefs_clone), Arc::clone(&claims_clone)).in_current_span().await;
trace!("Finished handling entry from watcher");
}
// NOTE(thomastaylor312): We should probably do something to automatically restart
// the watch if something fails. But for now this should be ok
error!("Cache watcher has exited");
})),
}
.instrument(tracing::trace_span!("kvstore-watcher", %lattice_prefix)),
);
let kvstore = CachedKvStore {
store,
linkdefs,
claims,
_handle: Arc::new(WrappedHandle { handle }),
};
rx.await??;
Ok(kvstore)
}
}

#[async_trait::async_trait]
impl Build for CachedKvStore {
async fn build(nc: Client, lattice_prefix: &str, js_domain: Option<String>) -> Result<Self> {
CachedKvStore::new(nc, lattice_prefix, js_domain).await
}
}

#[async_trait::async_trait]
impl KvStore for CachedKvStore {
/// Return a copy of all link definitions in the store
Expand Down Expand Up @@ -252,9 +277,9 @@ async fn handle_entry(
claims: Arc<RwLock<ClaimsMap>>,
) {
if entry.key.starts_with(LINKDEF_PREFIX) {
handle_linkdef(entry, linkdefs).await;
handle_linkdef(entry, linkdefs).in_current_span().await;
} else if entry.key.starts_with(CLAIMS_PREFIX) {
handle_claim(entry, claims).await;
handle_claim(entry, claims).in_current_span().await;
} else {
debug!(key = %entry.key, "Ignoring entry with unrecognized key");
}
Expand All @@ -263,10 +288,13 @@ async fn handle_entry(
async fn handle_linkdef(entry: Entry, linkdefs: Arc<RwLock<HashMap<String, LinkDefinition>>>) {
match entry.operation {
Operation::Delete | Operation::Purge => {
trace!("Handling linkdef delete entry");
let mut linkdefs = linkdefs.write().await;
linkdefs.remove(entry.key.trim_start_matches(LINKDEF_PREFIX));
trace!(num_entries = %linkdefs.len(), "Finished handling linkdef delete entry");
}
Operation::Put => {
trace!("Handling linkdef put entry");
let ld: LinkDefinition = match serde_json::from_slice(&entry.value) {
Ok(ld) => ld,
Err(e) => {
Expand All @@ -275,18 +303,30 @@ async fn handle_linkdef(entry: Entry, linkdefs: Arc<RwLock<HashMap<String, LinkD
}
};
let key = entry.key.trim_start_matches(LINKDEF_PREFIX).to_owned();
linkdefs.write().await.insert(key, ld);
let mut lds = linkdefs.write().await;
match lds.insert(key, ld) {
Some(_) => {
trace!("Updated linkdef with new information");
}
None => {
trace!("Added new linkdef");
}
}
trace!(num_entries = %lds.len(), "Finished handling linkdef put entry");
}
}
}

async fn handle_claim(entry: Entry, claims: Arc<RwLock<ClaimsMap>>) {
match entry.operation {
Operation::Delete | Operation::Purge => {
trace!("Handling claim delete entry");
let mut claims = claims.write().await;
claims.remove(entry.key.trim_start_matches(CLAIMS_PREFIX));
trace!(num_entries = %claims.len(), "Finished handling claim delete entry");
}
Operation::Put => {
trace!("Handling claim put entry");
let json: HashMap<String, String> = match serde_json::from_slice(&entry.value) {
Ok(j) => j,
Err(e) => {
Expand All @@ -301,7 +341,16 @@ async fn handle_claim(entry: Entry, claims: Arc<RwLock<ClaimsMap>>) {
return;
}
};
claims.write().await.insert(sub, json);
let mut c = claims.write().await;
match c.insert(sub, json) {
Some(_) => {
trace!("Updated claim with new information");
}
None => {
trace!("Added new claim");
}
}
trace!(num_entries = %c.len(), "Finished handling claim put entry");
}
}
}
9 changes: 8 additions & 1 deletion src/kv/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use serde::de::DeserializeOwned;
use tracing::{debug, error};

use super::{
delete_link, ld_hash_raw, put_link, KvStore, CLAIMS_PREFIX, LINKDEF_PREFIX, SUBJECT_KEY,
delete_link, ld_hash_raw, put_link, Build, KvStore, CLAIMS_PREFIX, LINKDEF_PREFIX, SUBJECT_KEY,
};
use crate::{types::LinkDefinition, Result};

Expand Down Expand Up @@ -130,6 +130,13 @@ impl DirectKvStore {
}
}

#[async_trait::async_trait]
impl Build for DirectKvStore {
async fn build(nc: Client, lattice_prefix: &str, js_domain: Option<String>) -> Result<Self> {
Self::new(nc, lattice_prefix, js_domain).await
}
}

#[async_trait::async_trait]
impl KvStore for DirectKvStore {
async fn get_links(&self) -> Result<Vec<LinkDefinition>> {
Expand Down
17 changes: 15 additions & 2 deletions src/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ pub trait KvStore {
async fn delete_link(&self, actor_id: &str, contract_id: &str, link_name: &str) -> Result<()>;
}

/// A trait that defines the interface for a type that can build a [`KvStore`]
#[async_trait::async_trait]
pub trait Build
where
Self: Sized,
{
/// Builds a [`KvStore`] using the given NATS client and lattice prefix
async fn build(nc: Client, lattice_prefix: &str, js_domain: Option<String>) -> Result<Self>;
}

/// A helper that creates a filter function for [`get_filtered_links`](KvStore::get_filtered_links)
/// to fetch links between a specific actor and provider
pub fn actor_and_provider_filter<'a>(
Expand Down Expand Up @@ -160,9 +170,12 @@ mod test {
async fn test_get_returns_none_for_nonexistent_store() {
let client = async_nats::connect("127.0.0.1:4222").await.unwrap();

CachedKvStore::new(client, "this-lattice-shall-never-existeth", None)
if CachedKvStore::new(client, "this-lattice-shall-never-existeth", None)
.await
.expect_err("Should not be able to get a store for a non-existent lattice");
.is_ok()
{
panic!("Should not be able to get a store for a non-existent lattice");
}
}

#[rstest]
Expand Down
Loading

0 comments on commit 62fb192

Please sign in to comment.