From d85f318c38ae83e92b7908473b531aafe340dd71 Mon Sep 17 00:00:00 2001 From: Alexander Gil Date: Fri, 6 Dec 2024 10:25:34 +0100 Subject: [PATCH] fix: GC executed at the beginning and add test --- kube-runtime/src/events.rs | 117 ++++++++++++++++++++++++++++++++----- 1 file changed, 102 insertions(+), 15 deletions(-) diff --git a/kube-runtime/src/events.rs b/kube-runtime/src/events.rs index 7895283dc..9c4b68987 100644 --- a/kube-runtime/src/events.rs +++ b/kube-runtime/src/events.rs @@ -307,6 +307,17 @@ impl Recorder { pub async fn publish(&self, ev: Event, reference: &ObjectReference) -> Result<(), kube_client::Error> { let now = Utc::now(); + // gc past events older than now + CACHE_TTL + self.cache.write().await.retain(|_, v| { + if let Some(series) = v.series.as_ref() { + series.last_observed_time.0 + CACHE_TTL > now + } else if let Some(event_time) = v.event_time.as_ref() { + event_time.0 + CACHE_TTL > now + } else { + true + } + }); + let key = self.get_event_key(&ev, reference); let event = match self.cache.read().await.get(&key) { Some(e) => { @@ -337,17 +348,6 @@ impl Recorder { { let mut cache = self.cache.write().await; cache.insert(key, event); - - // gc past events older than now + CACHE_TTL - cache.retain(|_, v| { - if let Some(series) = v.series.as_ref() { - series.last_observed_time.0 + CACHE_TTL > now - } else if let Some(event_time) = v.event_time.as_ref() { - event_time.0 + CACHE_TTL > now - } else { - true - } - }); } Ok(()) } @@ -355,13 +355,21 @@ impl Recorder { #[cfg(test)] mod test { - use k8s_openapi::api::{ - core::v1::{ComponentStatus, Service}, - events::v1::Event as K8sEvent, + use std::{collections::HashMap, sync::Arc}; + + use k8s_openapi::{ + api::{ + core::v1::{ComponentStatus, Service}, + events::v1::Event as K8sEvent, + }, + apimachinery::pkg::apis::meta::v1::MicroTime, + chrono::{Duration, Utc}, }; use kube::{Api, Client, Resource}; + use kube_client::api::ObjectMeta; + use tokio::sync::RwLock; - use super::{Event, EventType, Recorder}; + use super::{Event, EventKey, EventType, Recorder, Reference, Reporter}; #[tokio::test] #[ignore = "needs cluster (creates an event for the default kubernetes service)"] @@ -468,4 +476,83 @@ mod test { assert!(found_event.series.is_some()); Ok(()) } + + #[tokio::test] + #[ignore = "needs cluster (creates an event for the default kubernetes service)"] + async fn event_recorder_cache_retain() -> Result<(), Box> { + let client = Client::try_default().await?; + + let svcs: Api = Api::namespaced(client.clone(), "default"); + let s = svcs.get("kubernetes").await?; // always a kubernetes service in default + + let reference = s.object_ref(&()); + let reporter: Reporter = "kube".into(); + let ev = Event { + type_: EventType::Normal, + reason: "TestCacheTtl".into(), + note: Some("Sending kubernetes to detention".into()), + action: "Test event - plz ignore".into(), + secondary: None, + }; + let key = EventKey { + event_type: ev.type_, + action: ev.action.clone(), + reason: ev.reason.clone(), + reporting_controller: reporter.controller.clone(), + regarding: Reference(reference.clone()), + reporting_instance: None, + related: None, + }; + + let now = Utc::now(); + let past = now - Duration::minutes(10); + let event = K8sEvent { + action: Some(ev.action.clone()), + reason: Some(ev.reason.clone()), + event_time: Some(MicroTime(past)), + regarding: Some(reference.clone()), + note: ev.note.clone().map(Into::into), + metadata: ObjectMeta { + namespace: reference.namespace.clone(), + name: Some(format!( + "{}.{:x}", + reference.name.as_ref().unwrap_or(&reporter.controller), + past.timestamp_nanos_opt().unwrap_or_else(|| past.timestamp()) + )), + ..Default::default() + }, + reporting_controller: Some(reporter.controller.clone()), + reporting_instance: Some( + reporter + .instance + .clone() + .unwrap_or_else(|| reporter.controller.clone()), + ), + type_: Some("Normal".into()), + ..Default::default() + }; + + let cache = Arc::new(RwLock::new(HashMap::new())); + + cache.write().await.insert(key.clone(), event.clone()); + + let recorder = Recorder { + client: client.clone(), + reporter: reporter.controller.into(), + cache, + }; + + recorder.publish(ev, &s.object_ref(&())).await?; + let events: Api = Api::namespaced(client, "default"); + + let event_list = events.list(&Default::default()).await?; + let found_event = event_list + .into_iter() + .find(|e| std::matches!(e.reason.as_deref(), Some("TestCacheTtl"))) + .unwrap(); + assert_eq!(found_event.note.unwrap(), "Sending kubernetes to detention"); + assert!(found_event.series.is_none()); + + Ok(()) + } }