1use std::{
3 collections::HashMap,
4 hash::{Hash, Hasher},
5 sync::Arc,
6};
7
8use k8s_openapi::{
9 api::{
10 core::v1::ObjectReference,
11 events::v1::{Event as K8sEvent, EventSeries},
12 },
13 apimachinery::pkg::apis::meta::v1::{MicroTime, ObjectMeta},
14 chrono::{Duration, Utc},
15};
16use kube_client::{
17 api::{Api, Patch, PatchParams, PostParams},
18 Client, ResourceExt,
19};
20use tokio::sync::RwLock;
21
22const CACHE_TTL: Duration = Duration::minutes(6);
23
24pub struct Event {
28 pub type_: EventType,
32
33 pub reason: String,
37
38 pub note: Option<String>,
42
43 pub action: String,
49
50 pub secondary: Option<ObjectReference>,
68}
69
70#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
72pub enum EventType {
73 Normal,
75 Warning,
77}
78
79#[derive(Clone, Debug, PartialEq)]
83pub struct Reference(ObjectReference);
84
85impl Eq for Reference {}
86
87impl Hash for Reference {
88 fn hash<H: Hasher>(&self, state: &mut H) {
89 self.0.api_version.hash(state);
90 self.0.kind.hash(state);
91 self.0.name.hash(state);
92 self.0.namespace.hash(state);
93 self.0.uid.hash(state);
94 }
95}
96
97#[derive(Clone, Debug, PartialEq, Eq, Hash)]
99struct EventKey {
100 pub event_type: EventType,
101 pub action: String,
102 pub reason: String,
103 pub reporting_controller: String,
104 pub reporting_instance: Option<String>,
105 pub regarding: Reference,
106 pub related: Option<Reference>,
107}
108
109#[derive(Clone, Debug, PartialEq, Eq, Hash)]
120pub struct Reporter {
121 pub controller: String,
125
126 pub instance: Option<String>,
147}
148
149impl From<String> for Reporter {
151 fn from(es: String) -> Self {
152 Self {
153 controller: es,
154 instance: None,
155 }
156 }
157}
158
159impl From<&str> for Reporter {
160 fn from(es: &str) -> Self {
161 let instance = hostname::get().ok().and_then(|h| h.into_string().ok());
162 Self {
163 controller: es.into(),
164 instance,
165 }
166 }
167}
168
169#[derive(Clone)]
222pub struct Recorder {
223 client: Client,
224 reporter: Reporter,
225 cache: Arc<RwLock<HashMap<EventKey, K8sEvent>>>,
226}
227
228impl Recorder {
229 #[must_use]
235 pub fn new(client: Client, reporter: Reporter) -> Self {
236 let cache = Arc::default();
237 Self {
238 client,
239 reporter,
240 cache,
241 }
242 }
243
244 fn get_event_key(&self, ev: &Event, regarding: &ObjectReference) -> EventKey {
247 EventKey {
248 event_type: ev.type_,
249 action: ev.action.clone(),
250 reason: ev.reason.clone(),
251 reporting_controller: self.reporter.controller.clone(),
252 reporting_instance: self.reporter.instance.clone(),
253 regarding: Reference(regarding.clone()),
254 related: ev.secondary.clone().map(Reference),
255 }
256 }
257
258 fn generate_event(&self, ev: &Event, reference: &ObjectReference) -> K8sEvent {
262 let now = Utc::now();
263 K8sEvent {
264 action: Some(ev.action.clone()),
265 reason: Some(ev.reason.clone()),
266 deprecated_count: None,
267 deprecated_first_timestamp: None,
268 deprecated_last_timestamp: None,
269 deprecated_source: None,
270 event_time: Some(MicroTime(now)),
271 regarding: Some(reference.clone()),
272 note: ev.note.clone(),
273 metadata: ObjectMeta {
274 namespace: reference.namespace.clone(),
275 name: Some(format!(
276 "{}.{:x}",
277 reference.name.as_ref().unwrap_or(&self.reporter.controller),
278 now.timestamp_nanos_opt().unwrap_or_else(|| now.timestamp())
279 )),
280 ..Default::default()
281 },
282 reporting_controller: Some(self.reporter.controller.clone()),
283 reporting_instance: Some(
284 self.reporter
285 .instance
286 .clone()
287 .unwrap_or_else(|| self.reporter.controller.clone()),
288 ),
289 series: None,
290 type_: match ev.type_ {
291 EventType::Normal => Some("Normal".into()),
292 EventType::Warning => Some("Warning".into()),
293 },
294 related: ev.secondary.clone(),
295 }
296 }
297
298 pub async fn publish(&self, ev: &Event, reference: &ObjectReference) -> Result<(), kube_client::Error> {
310 let now = Utc::now();
311
312 self.cache.write().await.retain(|_, v| {
314 if let Some(series) = v.series.as_ref() {
315 series.last_observed_time.0 + CACHE_TTL > now
316 } else if let Some(event_time) = v.event_time.as_ref() {
317 event_time.0 + CACHE_TTL > now
318 } else {
319 true
320 }
321 });
322
323 let key = self.get_event_key(ev, reference);
324 let event = match self.cache.read().await.get(&key) {
325 Some(e) => {
326 let count = if let Some(s) = &e.series { s.count + 1 } else { 2 };
327 let series = EventSeries {
328 count,
329 last_observed_time: MicroTime(now),
330 };
331 let mut event = e.clone();
332 event.series = Some(series);
333 event
334 }
335 None => self.generate_event(ev, reference),
336 };
337
338 let events = Api::namespaced(
339 self.client.clone(),
340 reference.namespace.as_ref().unwrap_or(&"default".to_string()),
341 );
342 if event.series.is_some() {
343 events
344 .patch(&event.name_any(), &PatchParams::default(), &Patch::Merge(&event))
345 .await?;
346 } else {
347 events.create(&PostParams::default(), &event).await?;
348 }
349
350 {
351 let mut cache = self.cache.write().await;
352 cache.insert(key, event);
353 }
354 Ok(())
355 }
356}
357
358#[cfg(test)]
359mod test {
360 use super::{Event, EventKey, EventType, Recorder, Reference, Reporter};
361
362 use k8s_openapi::{
363 api::{
364 core::v1::{ComponentStatus, Service},
365 events::v1::Event as K8sEvent,
366 },
367 apimachinery::pkg::apis::meta::v1::MicroTime,
368 chrono::{Duration, Utc},
369 };
370 use kube::{Api, Client, Resource};
371
372 #[tokio::test]
373 #[ignore = "needs cluster (creates an event for the default kubernetes service)"]
374 async fn event_recorder_attaches_events() -> Result<(), Box<dyn std::error::Error>> {
375 let client = Client::try_default().await?;
376
377 let svcs: Api<Service> = Api::namespaced(client.clone(), "default");
378 let s = svcs.get("kubernetes").await?; let recorder = Recorder::new(client.clone(), "kube".into());
380 recorder
381 .publish(
382 &Event {
383 type_: EventType::Normal,
384 reason: "VeryCoolService".into(),
385 note: Some("Sending kubernetes to detention".into()),
386 action: "Test event - plz ignore".into(),
387 secondary: None,
388 },
389 &s.object_ref(&()),
390 )
391 .await?;
392 let events: Api<K8sEvent> = Api::namespaced(client, "default");
393
394 let event_list = events.list(&Default::default()).await?;
395 let found_event = event_list
396 .into_iter()
397 .find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolService")))
398 .unwrap();
399 assert_eq!(found_event.note.unwrap(), "Sending kubernetes to detention");
400
401 recorder
402 .publish(
403 &Event {
404 type_: EventType::Normal,
405 reason: "VeryCoolService".into(),
406 note: Some("Sending kubernetes to detention twice".into()),
407 action: "Test event - plz ignore".into(),
408 secondary: None,
409 },
410 &s.object_ref(&()),
411 )
412 .await?;
413
414 let event_list = events.list(&Default::default()).await?;
415 let found_event = event_list
416 .into_iter()
417 .find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolService")))
418 .unwrap();
419 assert!(found_event.series.is_some());
420
421 Ok(())
422 }
423
424 #[tokio::test]
425 #[ignore = "needs cluster (creates an event for the default kubernetes service)"]
426 async fn event_recorder_attaches_events_without_namespace() -> Result<(), Box<dyn std::error::Error>> {
427 let client = Client::try_default().await?;
428
429 let component_status_api: Api<ComponentStatus> = Api::all(client.clone());
430 let s = component_status_api.get("scheduler").await?;
431 let recorder = Recorder::new(client.clone(), "kube".into());
432 recorder
433 .publish(
434 &Event {
435 type_: EventType::Normal,
436 reason: "VeryCoolServiceNoNamespace".into(),
437 note: Some("Sending kubernetes to detention without namespace".into()),
438 action: "Test event - plz ignore".into(),
439 secondary: None,
440 },
441 &s.object_ref(&()),
442 )
443 .await?;
444 let events: Api<K8sEvent> = Api::namespaced(client, "default");
445
446 let event_list = events.list(&Default::default()).await?;
447 let found_event = event_list
448 .into_iter()
449 .find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolServiceNoNamespace")))
450 .unwrap();
451 assert_eq!(
452 found_event.note.unwrap(),
453 "Sending kubernetes to detention without namespace"
454 );
455
456 recorder
457 .publish(
458 &Event {
459 type_: EventType::Normal,
460 reason: "VeryCoolServiceNoNamespace".into(),
461 note: Some("Sending kubernetes to detention without namespace twice".into()),
462 action: "Test event - plz ignore".into(),
463 secondary: None,
464 },
465 &s.object_ref(&()),
466 )
467 .await?;
468
469 let event_list = events.list(&Default::default()).await?;
470 let found_event = event_list
471 .into_iter()
472 .find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolServiceNoNamespace")))
473 .unwrap();
474 assert!(found_event.series.is_some());
475 Ok(())
476 }
477
478 #[tokio::test]
479 #[ignore = "needs cluster (creates an event for the default kubernetes service)"]
480 async fn event_recorder_cache_retain() -> Result<(), Box<dyn std::error::Error>> {
481 let client = Client::try_default().await?;
482
483 let svcs: Api<Service> = Api::namespaced(client.clone(), "default");
484 let s = svcs.get("kubernetes").await?; let reference = s.object_ref(&());
487 let reporter: Reporter = "kube".into();
488 let ev = Event {
489 type_: EventType::Normal,
490 reason: "TestCacheTtl".into(),
491 note: Some("Sending kubernetes to detention".into()),
492 action: "Test event - plz ignore".into(),
493 secondary: None,
494 };
495 let key = EventKey {
496 event_type: ev.type_,
497 action: ev.action.clone(),
498 reason: ev.reason.clone(),
499 reporting_controller: reporter.controller.clone(),
500 regarding: Reference(reference.clone()),
501 reporting_instance: None,
502 related: None,
503 };
504
505 let reporter = Reporter {
506 controller: "kube".into(),
507 instance: None,
508 };
509 let recorder = Recorder::new(client.clone(), reporter);
510
511 recorder.publish(&ev, &s.object_ref(&())).await?;
512 let now = Utc::now();
513 let past = now - Duration::minutes(10);
514 recorder.cache.write().await.entry(key).and_modify(|e| {
515 e.event_time = Some(MicroTime(past));
516 });
517
518 recorder.publish(&ev, &s.object_ref(&())).await?;
519
520 let events: Api<K8sEvent> = Api::namespaced(client, "default");
521 let event_list = events.list(&Default::default()).await?;
522 let found_event = event_list
523 .into_iter()
524 .find(|e| std::matches!(e.reason.as_deref(), Some("TestCacheTtl")))
525 .unwrap();
526 assert_eq!(found_event.note.unwrap(), "Sending kubernetes to detention");
527 assert!(found_event.series.is_none());
528
529 Ok(())
530 }
531}