kube_runtime/
events.rs

1//! Publishes events for objects for kubernetes >= 1.19
2use std::{
3    collections::HashMap,
4    hash::{Hash, Hasher},
5    sync::Arc,
6};
7
8use k8s_openapi::{
9    api::{
10        core::v1::ObjectReference,
11        events::v1::{Event as K8sEvent, EventSeries},
12    },
13    apimachinery::pkg::apis::meta::v1::{MicroTime, ObjectMeta},
14    chrono::{Duration, Utc},
15};
16use kube_client::{
17    api::{Api, Patch, PatchParams, PostParams},
18    Client, ResourceExt,
19};
20use tokio::sync::RwLock;
21
22const CACHE_TTL: Duration = Duration::minutes(6);
23
24/// Minimal event type for publishing through [`Recorder::publish`].
25///
26/// All string fields must be human readable.
27pub struct Event {
28    /// The event severity.
29    ///
30    /// Shows up in `kubectl describe` as `Type`.
31    pub type_: EventType,
32
33    /// The short reason explaining why the `action` was taken.
34    ///
35    /// This must be at most 128 characters, generally in `PascalCase`. Shows up in `kubectl describe` as `Reason`.
36    pub reason: String,
37
38    /// A optional description of the status of the `action`.
39    ///
40    /// This must be at most 1kB in size. Shows up in `kubectl describe` as `Message`.
41    pub note: Option<String>,
42
43    /// The action that was taken (either successfully or unsuccessfully) against main object
44    ///
45    /// This must be at most 128 characters. It does not currently show up in `kubectl describe`.
46    /// A common convention is a short identifier of the action that caused the outcome described in `reason`.
47    /// Usually denoted in `PascalCase`.
48    pub action: String,
49
50    /// Optional secondary object related to the main object
51    ///
52    /// Some events are emitted for actions that affect multiple objects.
53    /// `secondary` can be populated to capture this detail.
54    ///
55    /// For example: the event concerns a `Deployment` and it affects the current `ReplicaSet` underneath it.
56    /// You would therefore populate `events` using the object reference of the `ReplicaSet`.
57    ///
58    /// Set `secondary` to `None`, instead, if the event affects only the object whose reference
59    /// you passed to [`Recorder::new`].
60    ///
61    /// # Naming note
62    ///
63    /// `secondary` is mapped to `related` in
64    /// [`Events API`](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#event-v1-events-k8s-io).
65    ///
66    /// [`Recorder::new`]: crate::events::Recorder::new
67    pub secondary: Option<ObjectReference>,
68}
69
70/// The event severity or type.
71#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
72pub enum EventType {
73    /// An event took place - nothing to worry about.
74    Normal,
75    /// Something is not working as expected - it might be worth to have a look.
76    Warning,
77}
78
79/// [`ObjectReference`] with Hash and Eq implementations
80///
81/// [`ObjectReference`]: k8s_openapi::api::core::v1::ObjectReference
82#[derive(Clone, Debug, PartialEq)]
83pub struct Reference(ObjectReference);
84
85impl Eq for Reference {}
86
87impl Hash for Reference {
88    fn hash<H: Hasher>(&self, state: &mut H) {
89        self.0.api_version.hash(state);
90        self.0.kind.hash(state);
91        self.0.name.hash(state);
92        self.0.namespace.hash(state);
93        self.0.uid.hash(state);
94    }
95}
96
97/// Cache key for event deduplication
98#[derive(Clone, Debug, PartialEq, Eq, Hash)]
99struct EventKey {
100    pub event_type: EventType,
101    pub action: String,
102    pub reason: String,
103    pub reporting_controller: String,
104    pub reporting_instance: Option<String>,
105    pub regarding: Reference,
106    pub related: Option<Reference>,
107}
108
109/// Information about the reporting controller.
110///
111/// ```
112/// use kube::runtime::events::Reporter;
113///
114/// let reporter = Reporter {
115///     controller: "my-awesome-controller".into(),
116///     instance: std::env::var("CONTROLLER_POD_NAME").ok(),
117/// };
118/// ```
119#[derive(Clone, Debug, PartialEq, Eq, Hash)]
120pub struct Reporter {
121    /// The name of the reporting controller that is publishing the event.
122    ///
123    /// This is likely your deployment.metadata.name.
124    pub controller: String,
125
126    /// The id of the controller publishing the event. Likely your pod name.
127    ///
128    /// Useful when running more than one replica on your controller and you need to disambiguate
129    /// where events came from.
130    ///
131    /// The name of the controller pod can be retrieved using Kubernetes' API or
132    /// it can be injected as an environment variable using
133    ///
134    /// ```yaml
135    /// env:
136    ///   - name: CONTROLLER_POD_NAME
137    ///     valueFrom:
138    ///       fieldRef:
139    ///         fieldPath: metadata.name
140    /// ```
141    ///
142    /// in the manifest of your controller.
143    ///
144    /// Note: If `instance` is not provided, the hostname is used. If the hostname is also
145    /// unavailable, `reporting_instance` defaults to `reporting_controller` in the `Event`.
146    pub instance: Option<String>,
147}
148
149// simple conversions for when instance == controller
150impl From<String> for Reporter {
151    fn from(es: String) -> Self {
152        Self {
153            controller: es,
154            instance: None,
155        }
156    }
157}
158
159impl From<&str> for Reporter {
160    fn from(es: &str) -> Self {
161        let instance = hostname::get().ok().and_then(|h| h.into_string().ok());
162        Self {
163            controller: es.into(),
164            instance,
165        }
166    }
167}
168
169/// A publisher abstraction to emit Kubernetes' events.
170///
171/// All events emitted by an `Recorder` are attached to the [`ObjectReference`]
172/// specified when building the recorder using [`Recorder::new`].
173///
174/// ```
175/// use kube::runtime::events::{Reporter, Recorder, Event, EventType};
176/// use k8s_openapi::api::core::v1::ObjectReference;
177///
178/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
179/// # let client: kube::Client = todo!();
180/// let reporter = Reporter {
181///     controller: "my-awesome-controller".into(),
182///     instance: std::env::var("CONTROLLER_POD_NAME").ok(),
183/// };
184///
185/// let recorder = Recorder::new(client, reporter);
186///
187/// // references can be made manually using `ObjectMeta` and `ApiResource`/`Resource` info
188/// let reference = ObjectReference {
189///     // [...]
190///     ..Default::default()
191/// };
192/// // or for k8s-openapi / kube-derive types, use Resource::object_ref:
193/// // let reference = myobject.object_ref();
194/// recorder
195///     .publish(
196///         &Event {
197///             action: "Scheduling".into(),
198///             reason: "Pulling".into(),
199///             note: Some("Pulling image `nginx`".into()),
200///             type_: EventType::Normal,
201///             secondary: None,
202///         },
203///         &reference,
204///     ).await?;
205/// # Ok(())
206/// # }
207/// ```
208///
209/// Events attached to an object will be shown in the `Events` section of the output of
210/// of `kubectl describe` for that object.
211///
212/// ## RBAC
213///
214/// Note that usage of the event recorder minimally requires the following RBAC rules:
215///
216/// ```yaml
217/// - apiGroups: ["events.k8s.io"]
218///   resources: ["events"]
219///   verbs: ["create", "patch"]
220/// ```
221#[derive(Clone)]
222pub struct Recorder {
223    client: Client,
224    reporter: Reporter,
225    cache: Arc<RwLock<HashMap<EventKey, K8sEvent>>>,
226}
227
228impl Recorder {
229    /// Create a new recorder that can publish events for one specific object
230    ///
231    /// This is intended to be created at the start of your controller's reconcile fn.
232    ///
233    /// Cluster scoped objects will publish events in the "default" namespace.
234    #[must_use]
235    pub fn new(client: Client, reporter: Reporter) -> Self {
236        let cache = Arc::default();
237        Self {
238            client,
239            reporter,
240            cache,
241        }
242    }
243
244    /// Builds unique event key based on reportingController, reportingInstance, regarding, reason
245    ///  and note
246    fn get_event_key(&self, ev: &Event, regarding: &ObjectReference) -> EventKey {
247        EventKey {
248            event_type: ev.type_,
249            action: ev.action.clone(),
250            reason: ev.reason.clone(),
251            reporting_controller: self.reporter.controller.clone(),
252            reporting_instance: self.reporter.instance.clone(),
253            regarding: Reference(regarding.clone()),
254            related: ev.secondary.clone().map(Reference),
255        }
256    }
257
258    // See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#event-v1-events-k8s-io
259    // for more detail on the fields
260    // and what's expected: https://kubernetes.io/docs/reference/using-api/deprecation-guide/#event-v125
261    fn generate_event(&self, ev: &Event, reference: &ObjectReference) -> K8sEvent {
262        let now = Utc::now();
263        K8sEvent {
264            action: Some(ev.action.clone()),
265            reason: Some(ev.reason.clone()),
266            deprecated_count: None,
267            deprecated_first_timestamp: None,
268            deprecated_last_timestamp: None,
269            deprecated_source: None,
270            event_time: Some(MicroTime(now)),
271            regarding: Some(reference.clone()),
272            note: ev.note.clone(),
273            metadata: ObjectMeta {
274                namespace: reference.namespace.clone(),
275                name: Some(format!(
276                    "{}.{:x}",
277                    reference.name.as_ref().unwrap_or(&self.reporter.controller),
278                    now.timestamp_nanos_opt().unwrap_or_else(|| now.timestamp())
279                )),
280                ..Default::default()
281            },
282            reporting_controller: Some(self.reporter.controller.clone()),
283            reporting_instance: Some(
284                self.reporter
285                    .instance
286                    .clone()
287                    .unwrap_or_else(|| self.reporter.controller.clone()),
288            ),
289            series: None,
290            type_: match ev.type_ {
291                EventType::Normal => Some("Normal".into()),
292                EventType::Warning => Some("Warning".into()),
293            },
294            related: ev.secondary.clone(),
295        }
296    }
297
298    /// Publish a new Kubernetes' event.
299    ///
300    /// # Access control
301    ///
302    /// The event object is created in the same namespace of the [`ObjectReference`].
303    /// Make sure that your controller has `create` permissions in the required namespaces
304    /// for the `event` resource in the API group `events.k8s.io`.
305    ///
306    /// # Errors
307    ///
308    /// Returns an [`Error`](`kube_client::Error`) if the event is rejected by Kubernetes.
309    pub async fn publish(&self, ev: &Event, reference: &ObjectReference) -> Result<(), kube_client::Error> {
310        let now = Utc::now();
311
312        // gc past events older than now + CACHE_TTL
313        self.cache.write().await.retain(|_, v| {
314            if let Some(series) = v.series.as_ref() {
315                series.last_observed_time.0 + CACHE_TTL > now
316            } else if let Some(event_time) = v.event_time.as_ref() {
317                event_time.0 + CACHE_TTL > now
318            } else {
319                true
320            }
321        });
322
323        let key = self.get_event_key(ev, reference);
324        let event = match self.cache.read().await.get(&key) {
325            Some(e) => {
326                let count = if let Some(s) = &e.series { s.count + 1 } else { 2 };
327                let series = EventSeries {
328                    count,
329                    last_observed_time: MicroTime(now),
330                };
331                let mut event = e.clone();
332                event.series = Some(series);
333                event
334            }
335            None => self.generate_event(ev, reference),
336        };
337
338        let events = Api::namespaced(
339            self.client.clone(),
340            reference.namespace.as_ref().unwrap_or(&"default".to_string()),
341        );
342        if event.series.is_some() {
343            events
344                .patch(&event.name_any(), &PatchParams::default(), &Patch::Merge(&event))
345                .await?;
346        } else {
347            events.create(&PostParams::default(), &event).await?;
348        }
349
350        {
351            let mut cache = self.cache.write().await;
352            cache.insert(key, event);
353        }
354        Ok(())
355    }
356}
357
358#[cfg(test)]
359mod test {
360    use super::{Event, EventKey, EventType, Recorder, Reference, Reporter};
361
362    use k8s_openapi::{
363        api::{
364            core::v1::{ComponentStatus, Service},
365            events::v1::Event as K8sEvent,
366        },
367        apimachinery::pkg::apis::meta::v1::MicroTime,
368        chrono::{Duration, Utc},
369    };
370    use kube::{Api, Client, Resource};
371
372    #[tokio::test]
373    #[ignore = "needs cluster (creates an event for the default kubernetes service)"]
374    async fn event_recorder_attaches_events() -> Result<(), Box<dyn std::error::Error>> {
375        let client = Client::try_default().await?;
376
377        let svcs: Api<Service> = Api::namespaced(client.clone(), "default");
378        let s = svcs.get("kubernetes").await?; // always a kubernetes service in default
379        let recorder = Recorder::new(client.clone(), "kube".into());
380        recorder
381            .publish(
382                &Event {
383                    type_: EventType::Normal,
384                    reason: "VeryCoolService".into(),
385                    note: Some("Sending kubernetes to detention".into()),
386                    action: "Test event - plz ignore".into(),
387                    secondary: None,
388                },
389                &s.object_ref(&()),
390            )
391            .await?;
392        let events: Api<K8sEvent> = Api::namespaced(client, "default");
393
394        let event_list = events.list(&Default::default()).await?;
395        let found_event = event_list
396            .into_iter()
397            .find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolService")))
398            .unwrap();
399        assert_eq!(found_event.note.unwrap(), "Sending kubernetes to detention");
400
401        recorder
402            .publish(
403                &Event {
404                    type_: EventType::Normal,
405                    reason: "VeryCoolService".into(),
406                    note: Some("Sending kubernetes to detention twice".into()),
407                    action: "Test event - plz ignore".into(),
408                    secondary: None,
409                },
410                &s.object_ref(&()),
411            )
412            .await?;
413
414        let event_list = events.list(&Default::default()).await?;
415        let found_event = event_list
416            .into_iter()
417            .find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolService")))
418            .unwrap();
419        assert!(found_event.series.is_some());
420
421        Ok(())
422    }
423
424    #[tokio::test]
425    #[ignore = "needs cluster (creates an event for the default kubernetes service)"]
426    async fn event_recorder_attaches_events_without_namespace() -> Result<(), Box<dyn std::error::Error>> {
427        let client = Client::try_default().await?;
428
429        let component_status_api: Api<ComponentStatus> = Api::all(client.clone());
430        let s = component_status_api.get("scheduler").await?;
431        let recorder = Recorder::new(client.clone(), "kube".into());
432        recorder
433            .publish(
434                &Event {
435                    type_: EventType::Normal,
436                    reason: "VeryCoolServiceNoNamespace".into(),
437                    note: Some("Sending kubernetes to detention without namespace".into()),
438                    action: "Test event - plz ignore".into(),
439                    secondary: None,
440                },
441                &s.object_ref(&()),
442            )
443            .await?;
444        let events: Api<K8sEvent> = Api::namespaced(client, "default");
445
446        let event_list = events.list(&Default::default()).await?;
447        let found_event = event_list
448            .into_iter()
449            .find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolServiceNoNamespace")))
450            .unwrap();
451        assert_eq!(
452            found_event.note.unwrap(),
453            "Sending kubernetes to detention without namespace"
454        );
455
456        recorder
457            .publish(
458                &Event {
459                    type_: EventType::Normal,
460                    reason: "VeryCoolServiceNoNamespace".into(),
461                    note: Some("Sending kubernetes to detention without namespace twice".into()),
462                    action: "Test event - plz ignore".into(),
463                    secondary: None,
464                },
465                &s.object_ref(&()),
466            )
467            .await?;
468
469        let event_list = events.list(&Default::default()).await?;
470        let found_event = event_list
471            .into_iter()
472            .find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolServiceNoNamespace")))
473            .unwrap();
474        assert!(found_event.series.is_some());
475        Ok(())
476    }
477
478    #[tokio::test]
479    #[ignore = "needs cluster (creates an event for the default kubernetes service)"]
480    async fn event_recorder_cache_retain() -> Result<(), Box<dyn std::error::Error>> {
481        let client = Client::try_default().await?;
482
483        let svcs: Api<Service> = Api::namespaced(client.clone(), "default");
484        let s = svcs.get("kubernetes").await?; // always a kubernetes service in default
485
486        let reference = s.object_ref(&());
487        let reporter: Reporter = "kube".into();
488        let ev = Event {
489            type_: EventType::Normal,
490            reason: "TestCacheTtl".into(),
491            note: Some("Sending kubernetes to detention".into()),
492            action: "Test event - plz ignore".into(),
493            secondary: None,
494        };
495        let key = EventKey {
496            event_type: ev.type_,
497            action: ev.action.clone(),
498            reason: ev.reason.clone(),
499            reporting_controller: reporter.controller.clone(),
500            regarding: Reference(reference.clone()),
501            reporting_instance: None,
502            related: None,
503        };
504
505        let reporter = Reporter {
506            controller: "kube".into(),
507            instance: None,
508        };
509        let recorder = Recorder::new(client.clone(), reporter);
510
511        recorder.publish(&ev, &s.object_ref(&())).await?;
512        let now = Utc::now();
513        let past = now - Duration::minutes(10);
514        recorder.cache.write().await.entry(key).and_modify(|e| {
515            e.event_time = Some(MicroTime(past));
516        });
517
518        recorder.publish(&ev, &s.object_ref(&())).await?;
519
520        let events: Api<K8sEvent> = Api::namespaced(client, "default");
521        let event_list = events.list(&Default::default()).await?;
522        let found_event = event_list
523            .into_iter()
524            .find(|e| std::matches!(e.reason.as_deref(), Some("TestCacheTtl")))
525            .unwrap();
526        assert_eq!(found_event.note.unwrap(), "Sending kubernetes to detention");
527        assert!(found_event.series.is_none());
528
529        Ok(())
530    }
531}