kube_runtime/
events.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
//! Publishes events for objects for kubernetes >= 1.19
use std::{
    collections::HashMap,
    hash::{Hash, Hasher},
    sync::Arc,
};

use k8s_openapi::{
    api::{
        core::v1::ObjectReference,
        events::v1::{Event as K8sEvent, EventSeries},
    },
    apimachinery::pkg::apis::meta::v1::{MicroTime, ObjectMeta},
    chrono::{Duration, Utc},
};
use kube_client::{
    api::{Api, Patch, PatchParams, PostParams},
    Client, ResourceExt,
};
use tokio::sync::RwLock;

const CACHE_TTL: Duration = Duration::minutes(6);

/// Minimal event type for publishing through [`Recorder::publish`].
///
/// All string fields must be human readable.
pub struct Event {
    /// The event severity.
    ///
    /// Shows up in `kubectl describe` as `Type`.
    pub type_: EventType,

    /// The short reason explaining why the `action` was taken.
    ///
    /// This must be at most 128 characters, generally in `PascalCase`. Shows up in `kubectl describe` as `Reason`.
    pub reason: String,

    /// A optional description of the status of the `action`.
    ///
    /// This must be at most 1kB in size. Shows up in `kubectl describe` as `Message`.
    pub note: Option<String>,

    /// The action that was taken (either successfully or unsuccessfully) against main object
    ///
    /// This must be at most 128 characters. It does not currently show up in `kubectl describe`.
    /// A common convention is a short identifier of the action that caused the outcome described in `reason`.
    /// Usually denoted in `PascalCase`.
    pub action: String,

    /// Optional secondary object related to the main object
    ///
    /// Some events are emitted for actions that affect multiple objects.
    /// `secondary` can be populated to capture this detail.
    ///
    /// For example: the event concerns a `Deployment` and it affects the current `ReplicaSet` underneath it.
    /// You would therefore populate `events` using the object reference of the `ReplicaSet`.
    ///
    /// Set `secondary` to `None`, instead, if the event affects only the object whose reference
    /// you passed to [`Recorder::new`].
    ///
    /// # Naming note
    ///
    /// `secondary` is mapped to `related` in
    /// [`Events API`](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#event-v1-events-k8s-io).
    ///
    /// [`Recorder::new`]: crate::events::Recorder::new
    pub secondary: Option<ObjectReference>,
}

/// The event severity or type.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
pub enum EventType {
    /// An event took place - nothing to worry about.
    Normal,
    /// Something is not working as expected - it might be worth to have a look.
    Warning,
}

/// [`ObjectReference`] with Hash and Eq implementations
///
/// [`ObjectReference`]: k8s_openapi::api::core::v1::ObjectReference
#[derive(Clone, Debug, PartialEq)]
pub struct Reference(ObjectReference);

impl Eq for Reference {}

impl Hash for Reference {
    fn hash<H: Hasher>(&self, state: &mut H) {
        self.0.api_version.hash(state);
        self.0.kind.hash(state);
        self.0.name.hash(state);
        self.0.namespace.hash(state);
        self.0.uid.hash(state);
    }
}

/// Cache key for event deduplication
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct EventKey {
    pub event_type: EventType,
    pub action: String,
    pub reason: String,
    pub reporting_controller: String,
    pub reporting_instance: Option<String>,
    pub regarding: Reference,
    pub related: Option<Reference>,
}

/// Information about the reporting controller.
///
/// ```
/// use kube::runtime::events::Reporter;
///
/// let reporter = Reporter {
///     controller: "my-awesome-controller".into(),
///     instance: std::env::var("CONTROLLER_POD_NAME").ok(),
/// };
/// ```
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct Reporter {
    /// The name of the reporting controller that is publishing the event.
    ///
    /// This is likely your deployment.metadata.name.
    pub controller: String,

    /// The id of the controller publishing the event. Likely your pod name.
    ///
    /// Useful when running more than one replica on your controller and you need to disambiguate
    /// where events came from.
    ///
    /// The name of the controller pod can be retrieved using Kubernetes' API or
    /// it can be injected as an environment variable using
    ///
    /// ```yaml
    /// env:
    ///   - name: CONTROLLER_POD_NAME
    ///     valueFrom:
    ///       fieldRef:
    ///         fieldPath: metadata.name
    /// ```
    ///
    /// in the manifest of your controller.
    ///
    /// Note: If `instance` is not provided, the hostname is used. If the hostname is also
    /// unavailable, `reporting_instance` defaults to `reporting_controller` in the `Event`.
    pub instance: Option<String>,
}

// simple conversions for when instance == controller
impl From<String> for Reporter {
    fn from(es: String) -> Self {
        Self {
            controller: es,
            instance: None,
        }
    }
}

impl From<&str> for Reporter {
    fn from(es: &str) -> Self {
        let instance = hostname::get().ok().and_then(|h| h.into_string().ok());
        Self {
            controller: es.into(),
            instance,
        }
    }
}

/// A publisher abstraction to emit Kubernetes' events.
///
/// All events emitted by an `Recorder` are attached to the [`ObjectReference`]
/// specified when building the recorder using [`Recorder::new`].
///
/// ```
/// use kube::runtime::events::{Reporter, Recorder, Event, EventType};
/// use k8s_openapi::api::core::v1::ObjectReference;
///
/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
/// # let client: kube::Client = todo!();
/// let reporter = Reporter {
///     controller: "my-awesome-controller".into(),
///     instance: std::env::var("CONTROLLER_POD_NAME").ok(),
/// };
///
/// let recorder = Recorder::new(client, reporter);
///
/// // references can be made manually using `ObjectMeta` and `ApiResource`/`Resource` info
/// let reference = ObjectReference {
///     // [...]
///     ..Default::default()
/// };
/// // or for k8s-openapi / kube-derive types, use Resource::object_ref:
/// // let reference = myobject.object_ref();
/// recorder
///     .publish(
///         &Event {
///             action: "Scheduling".into(),
///             reason: "Pulling".into(),
///             note: Some("Pulling image `nginx`".into()),
///             type_: EventType::Normal,
///             secondary: None,
///         },
///         &reference,
///     ).await?;
/// # Ok(())
/// # }
/// ```
///
/// Events attached to an object will be shown in the `Events` section of the output of
/// of `kubectl describe` for that object.
///
/// ## RBAC
///
/// Note that usage of the event recorder minimally requires the following RBAC rules:
///
/// ```yaml
/// - apiGroups: ["events.k8s.io"]
///   resources: ["events"]
///   verbs: ["create", "patch"]
/// ```
#[derive(Clone)]
pub struct Recorder {
    client: Client,
    reporter: Reporter,
    cache: Arc<RwLock<HashMap<EventKey, K8sEvent>>>,
}

impl Recorder {
    /// Create a new recorder that can publish events for one specific object
    ///
    /// This is intended to be created at the start of your controller's reconcile fn.
    ///
    /// Cluster scoped objects will publish events in the "default" namespace.
    #[must_use]
    pub fn new(client: Client, reporter: Reporter) -> Self {
        let cache = Arc::default();
        Self {
            client,
            reporter,
            cache,
        }
    }

    /// Builds unique event key based on reportingController, reportingInstance, regarding, reason
    ///  and note
    fn get_event_key(&self, ev: &Event, regarding: &ObjectReference) -> EventKey {
        EventKey {
            event_type: ev.type_,
            action: ev.action.clone(),
            reason: ev.reason.clone(),
            reporting_controller: self.reporter.controller.clone(),
            reporting_instance: self.reporter.instance.clone(),
            regarding: Reference(regarding.clone()),
            related: ev.secondary.clone().map(Reference),
        }
    }

    // See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#event-v1-events-k8s-io
    // for more detail on the fields
    // and what's expected: https://kubernetes.io/docs/reference/using-api/deprecation-guide/#event-v125
    fn generate_event(&self, ev: &Event, reference: &ObjectReference) -> K8sEvent {
        let now = Utc::now();
        K8sEvent {
            action: Some(ev.action.clone()),
            reason: Some(ev.reason.clone()),
            deprecated_count: None,
            deprecated_first_timestamp: None,
            deprecated_last_timestamp: None,
            deprecated_source: None,
            event_time: Some(MicroTime(now)),
            regarding: Some(reference.clone()),
            note: ev.note.clone().map(Into::into),
            metadata: ObjectMeta {
                namespace: reference.namespace.clone(),
                name: Some(format!(
                    "{}.{:x}",
                    reference.name.as_ref().unwrap_or(&self.reporter.controller),
                    now.timestamp_nanos_opt().unwrap_or_else(|| now.timestamp())
                )),
                ..Default::default()
            },
            reporting_controller: Some(self.reporter.controller.clone()),
            reporting_instance: Some(
                self.reporter
                    .instance
                    .clone()
                    .unwrap_or_else(|| self.reporter.controller.clone()),
            ),
            series: None,
            type_: match ev.type_ {
                EventType::Normal => Some("Normal".into()),
                EventType::Warning => Some("Warning".into()),
            },
            related: ev.secondary.clone(),
        }
    }

    /// Publish a new Kubernetes' event.
    ///
    /// # Access control
    ///
    /// The event object is created in the same namespace of the [`ObjectReference`].
    /// Make sure that your controller has `create` permissions in the required namespaces
    /// for the `event` resource in the API group `events.k8s.io`.
    ///
    /// # Errors
    ///
    /// Returns an [`Error`](`kube_client::Error`) if the event is rejected by Kubernetes.
    pub async fn publish(&self, ev: &Event, reference: &ObjectReference) -> Result<(), kube_client::Error> {
        let now = Utc::now();

        // gc past events older than now + CACHE_TTL
        self.cache.write().await.retain(|_, v| {
            if let Some(series) = v.series.as_ref() {
                series.last_observed_time.0 + CACHE_TTL > now
            } else if let Some(event_time) = v.event_time.as_ref() {
                event_time.0 + CACHE_TTL > now
            } else {
                true
            }
        });

        let key = self.get_event_key(ev, reference);
        let event = match self.cache.read().await.get(&key) {
            Some(e) => {
                let count = if let Some(s) = &e.series { s.count + 1 } else { 2 };
                let series = EventSeries {
                    count,
                    last_observed_time: MicroTime(now),
                };
                let mut event = e.clone();
                event.series = Some(series);
                event
            }
            None => self.generate_event(ev, reference),
        };

        let events = Api::namespaced(
            self.client.clone(),
            reference.namespace.as_ref().unwrap_or(&"default".to_string()),
        );
        if event.series.is_some() {
            events
                .patch(&event.name_any(), &PatchParams::default(), &Patch::Merge(&event))
                .await?;
        } else {
            events.create(&PostParams::default(), &event).await?;
        };

        {
            let mut cache = self.cache.write().await;
            cache.insert(key, event);
        }
        Ok(())
    }
}

#[cfg(test)]
mod test {
    use super::{Event, EventKey, EventType, Recorder, Reference, Reporter};

    use k8s_openapi::{
        api::{
            core::v1::{ComponentStatus, Service},
            events::v1::Event as K8sEvent,
        },
        apimachinery::pkg::apis::meta::v1::MicroTime,
        chrono::{Duration, Utc},
    };
    use kube::{Api, Client, Resource};

    #[tokio::test]
    #[ignore = "needs cluster (creates an event for the default kubernetes service)"]
    async fn event_recorder_attaches_events() -> Result<(), Box<dyn std::error::Error>> {
        let client = Client::try_default().await?;

        let svcs: Api<Service> = Api::namespaced(client.clone(), "default");
        let s = svcs.get("kubernetes").await?; // always a kubernetes service in default
        let recorder = Recorder::new(client.clone(), "kube".into());
        recorder
            .publish(
                &Event {
                    type_: EventType::Normal,
                    reason: "VeryCoolService".into(),
                    note: Some("Sending kubernetes to detention".into()),
                    action: "Test event - plz ignore".into(),
                    secondary: None,
                },
                &s.object_ref(&()),
            )
            .await?;
        let events: Api<K8sEvent> = Api::namespaced(client, "default");

        let event_list = events.list(&Default::default()).await?;
        let found_event = event_list
            .into_iter()
            .find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolService")))
            .unwrap();
        assert_eq!(found_event.note.unwrap(), "Sending kubernetes to detention");

        recorder
            .publish(
                &Event {
                    type_: EventType::Normal,
                    reason: "VeryCoolService".into(),
                    note: Some("Sending kubernetes to detention twice".into()),
                    action: "Test event - plz ignore".into(),
                    secondary: None,
                },
                &s.object_ref(&()),
            )
            .await?;

        let event_list = events.list(&Default::default()).await?;
        let found_event = event_list
            .into_iter()
            .find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolService")))
            .unwrap();
        assert!(found_event.series.is_some());

        Ok(())
    }

    #[tokio::test]
    #[ignore = "needs cluster (creates an event for the default kubernetes service)"]
    async fn event_recorder_attaches_events_without_namespace() -> Result<(), Box<dyn std::error::Error>> {
        let client = Client::try_default().await?;

        let component_status_api: Api<ComponentStatus> = Api::all(client.clone());
        let s = component_status_api.get("scheduler").await?;
        let recorder = Recorder::new(client.clone(), "kube".into());
        recorder
            .publish(
                &Event {
                    type_: EventType::Normal,
                    reason: "VeryCoolServiceNoNamespace".into(),
                    note: Some("Sending kubernetes to detention without namespace".into()),
                    action: "Test event - plz ignore".into(),
                    secondary: None,
                },
                &s.object_ref(&()),
            )
            .await?;
        let events: Api<K8sEvent> = Api::namespaced(client, "default");

        let event_list = events.list(&Default::default()).await?;
        let found_event = event_list
            .into_iter()
            .find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolServiceNoNamespace")))
            .unwrap();
        assert_eq!(
            found_event.note.unwrap(),
            "Sending kubernetes to detention without namespace"
        );

        recorder
            .publish(
                &Event {
                    type_: EventType::Normal,
                    reason: "VeryCoolServiceNoNamespace".into(),
                    note: Some("Sending kubernetes to detention without namespace twice".into()),
                    action: "Test event - plz ignore".into(),
                    secondary: None,
                },
                &s.object_ref(&()),
            )
            .await?;

        let event_list = events.list(&Default::default()).await?;
        let found_event = event_list
            .into_iter()
            .find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolServiceNoNamespace")))
            .unwrap();
        assert!(found_event.series.is_some());
        Ok(())
    }

    #[tokio::test]
    #[ignore = "needs cluster (creates an event for the default kubernetes service)"]
    async fn event_recorder_cache_retain() -> Result<(), Box<dyn std::error::Error>> {
        let client = Client::try_default().await?;

        let svcs: Api<Service> = Api::namespaced(client.clone(), "default");
        let s = svcs.get("kubernetes").await?; // always a kubernetes service in default

        let reference = s.object_ref(&());
        let reporter: Reporter = "kube".into();
        let ev = Event {
            type_: EventType::Normal,
            reason: "TestCacheTtl".into(),
            note: Some("Sending kubernetes to detention".into()),
            action: "Test event - plz ignore".into(),
            secondary: None,
        };
        let key = EventKey {
            event_type: ev.type_,
            action: ev.action.clone(),
            reason: ev.reason.clone(),
            reporting_controller: reporter.controller.clone(),
            regarding: Reference(reference.clone()),
            reporting_instance: None,
            related: None,
        };

        let reporter = Reporter {
            controller: "kube".into(),
            instance: None,
        };
        let recorder = Recorder::new(client.clone(), reporter);

        recorder.publish(&ev, &s.object_ref(&())).await?;
        let now = Utc::now();
        let past = now - Duration::minutes(10);
        recorder.cache.write().await.entry(key).and_modify(|e| {
            e.event_time = Some(MicroTime(past));
        });

        recorder.publish(&ev, &s.object_ref(&())).await?;

        let events: Api<K8sEvent> = Api::namespaced(client, "default");
        let event_list = events.list(&Default::default()).await?;
        let found_event = event_list
            .into_iter()
            .find(|e| std::matches!(e.reason.as_deref(), Some("TestCacheTtl")))
            .unwrap();
        assert_eq!(found_event.note.unwrap(), "Sending kubernetes to detention");
        assert!(found_event.series.is_none());

        Ok(())
    }
}