kube_client/api/core_methods.rs
1use either::Either;
2use futures::Stream;
3use serde::{de::DeserializeOwned, Serialize};
4use std::fmt::Debug;
5
6use crate::{api::Api, Error, Result};
7use kube_core::{
8 metadata::PartialObjectMeta, object::ObjectList, params::*, response::Status, ErrorResponse, WatchEvent,
9};
10
11/// PUSH/PUT/POST/GET abstractions
12impl<K> Api<K>
13where
14 K: Clone + DeserializeOwned + Debug,
15{
16 /// Get a named resource
17 ///
18 /// ```no_run
19 /// # use kube::Api;
20 /// use k8s_openapi::api::core::v1::Pod;
21 ///
22 /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
23 /// # let client: kube::Client = todo!();
24 /// let pods: Api<Pod> = Api::namespaced(client, "apps");
25 /// let p: Pod = pods.get("blog").await?;
26 /// # Ok(())
27 /// # }
28 /// ```
29 ///
30 /// # Errors
31 ///
32 /// This function assumes that the object is expected to always exist, and returns [`Error`] if it does not.
33 /// Consider using [`Api::get_opt`] if you need to handle missing objects.
34 pub async fn get(&self, name: &str) -> Result<K> {
35 self.get_with(name, &GetParams::default()).await
36 }
37
38 /// Get only the metadata for a named resource as [`PartialObjectMeta`]
39 ///
40 /// ```no_run
41 /// use kube::{Api, core::PartialObjectMeta};
42 /// use k8s_openapi::api::core::v1::Pod;
43 ///
44 /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
45 /// # let client: kube::Client = todo!();
46 /// let pods: Api<Pod> = Api::namespaced(client, "apps");
47 /// let p: PartialObjectMeta<Pod> = pods.get_metadata("blog").await?;
48 /// # Ok(())
49 /// # }
50 /// ```
51 /// Note that the type may be converted to `ObjectMeta` through the usual
52 /// conversion traits.
53 ///
54 /// # Errors
55 ///
56 /// This function assumes that the object is expected to always exist, and returns [`Error`] if it does not.
57 /// Consider using [`Api::get_metadata_opt`] if you need to handle missing objects.
58 pub async fn get_metadata(&self, name: &str) -> Result<PartialObjectMeta<K>> {
59 self.get_metadata_with(name, &GetParams::default()).await
60 }
61
62 /// [Get](`Api::get`) a named resource with an explicit resourceVersion
63 ///
64 /// This function allows the caller to pass in a [`GetParams`](`super::GetParams`) type containing
65 /// a `resourceVersion` to a [Get](`Api::get`) call.
66 /// For example
67 ///
68 /// ```no_run
69 /// # use kube::{Api, api::GetParams};
70 /// use k8s_openapi::api::core::v1::Pod;
71 ///
72 /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
73 /// # let client: kube::Client = todo!();
74 /// let pods: Api<Pod> = Api::namespaced(client, "apps");
75 /// let p: Pod = pods.get_with("blog", &GetParams::any()).await?;
76 /// # Ok(())
77 /// # }
78 /// ```
79 ///
80 /// # Errors
81 ///
82 /// This function assumes that the object is expected to always exist, and returns [`Error`] if it does not.
83 /// Consider using [`Api::get_opt`] if you need to handle missing objects.
84 pub async fn get_with(&self, name: &str, gp: &GetParams) -> Result<K> {
85 let mut req = self.request.get(name, gp).map_err(Error::BuildRequest)?;
86 req.extensions_mut().insert("get");
87 self.client.request::<K>(req).await
88 }
89
90 /// [Get](`Api::get_metadata`) the metadata of an object using an explicit `resourceVersion`
91 ///
92 /// This function allows the caller to pass in a [`GetParams`](`super::GetParams`) type containing
93 /// a `resourceVersion` to a [Get](`Api::get_metadata`) call.
94 /// For example
95 ///
96 ///
97 /// ```no_run
98 /// use kube::{Api, api::GetParams, core::PartialObjectMeta};
99 /// use k8s_openapi::api::core::v1::Pod;
100 ///
101 /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
102 /// # let client: kube::Client = todo!();
103 /// let pods: Api<Pod> = Api::namespaced(client, "apps");
104 /// let p: PartialObjectMeta<Pod> = pods.get_metadata_with("blog", &GetParams::any()).await?;
105 /// # Ok(())
106 /// # }
107 /// ```
108 /// Note that the type may be converted to `ObjectMeta` through the usual
109 /// conversion traits.
110 ///
111 /// # Errors
112 ///
113 /// This function assumes that the object is expected to always exist, and returns [`Error`] if it does not.
114 /// Consider using [`Api::get_metadata_opt`] if you need to handle missing objects.
115 pub async fn get_metadata_with(&self, name: &str, gp: &GetParams) -> Result<PartialObjectMeta<K>> {
116 let mut req = self.request.get_metadata(name, gp).map_err(Error::BuildRequest)?;
117 req.extensions_mut().insert("get_metadata");
118 self.client.request::<PartialObjectMeta<K>>(req).await
119 }
120
121 /// [Get](`Api::get`) a named resource if it exists, returns [`None`] if it doesn't exist
122 ///
123 /// ```no_run
124 /// # use kube::Api;
125 /// use k8s_openapi::api::core::v1::Pod;
126 ///
127 /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
128 /// # let client: kube::Client = todo!();
129 /// let pods: Api<Pod> = Api::namespaced(client, "apps");
130 /// if let Some(pod) = pods.get_opt("blog").await? {
131 /// // Pod was found
132 /// } else {
133 /// // Pod was not found
134 /// }
135 /// # Ok(())
136 /// # }
137 /// ```
138 pub async fn get_opt(&self, name: &str) -> Result<Option<K>> {
139 match self.get(name).await {
140 Ok(obj) => Ok(Some(obj)),
141 Err(Error::Api(ErrorResponse { reason, .. })) if &reason == "NotFound" => Ok(None),
142 Err(err) => Err(err),
143 }
144 }
145
146 /// [Get Metadata](`Api::get_metadata`) for a named resource if it exists, returns [`None`] if it doesn't exit
147 ///
148 /// ```no_run
149 /// # use kube::Api;
150 /// use k8s_openapi::api::core::v1::Pod;
151 /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
152 /// # let client: kube::Client = todo!();
153 /// let pods: Api<Pod> = Api::namespaced(client, "apps");
154 /// if let Some(pod) = pods.get_metadata_opt("blog").await? {
155 /// // Pod was found
156 /// } else {
157 /// // Pod was not found
158 /// }
159 /// # Ok(())
160 /// # }
161 /// ```
162 ///
163 /// Note that [`PartialObjectMeta`] embeds the raw `ObjectMeta`.
164 pub async fn get_metadata_opt(&self, name: &str) -> Result<Option<PartialObjectMeta<K>>> {
165 match self.get_metadata(name).await {
166 Ok(meta) => Ok(Some(meta)),
167 Err(Error::Api(ErrorResponse { reason, .. })) if &reason == "NotFound" => Ok(None),
168 Err(err) => Err(err),
169 }
170 }
171
172 /// Get a list of resources
173 ///
174 /// You use this to get everything, or a subset matching fields/labels, say:
175 ///
176 /// ```no_run
177 /// use kube::api::{Api, ListParams, ResourceExt};
178 /// use k8s_openapi::api::core::v1::Pod;
179 ///
180 /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
181 /// # let client: kube::Client = todo!();
182 /// let pods: Api<Pod> = Api::namespaced(client, "apps");
183 /// let lp = ListParams::default().labels("app=blog"); // for this app only
184 /// for p in pods.list(&lp).await? {
185 /// println!("Found Pod: {}", p.name_any());
186 /// }
187 /// # Ok(())
188 /// # }
189 /// ```
190 pub async fn list(&self, lp: &ListParams) -> Result<ObjectList<K>> {
191 let mut req = self.request.list(lp).map_err(Error::BuildRequest)?;
192 req.extensions_mut().insert("list");
193 self.client.request::<ObjectList<K>>(req).await
194 }
195
196 /// Get a list of resources that contains only their metadata as
197 ///
198 /// Similar to [list](`Api::list`), you use this to get everything, or a
199 /// subset matching fields/labels. For example
200 ///
201 /// ```no_run
202 /// use kube::api::{Api, ListParams, ResourceExt};
203 /// use kube::core::{ObjectMeta, ObjectList, PartialObjectMeta};
204 /// use k8s_openapi::api::core::v1::Pod;
205 ///
206 /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
207 /// # let client: kube::Client = todo!();
208 /// let pods: Api<Pod> = Api::namespaced(client, "apps");
209 /// let lp = ListParams::default().labels("app=blog"); // for this app only
210 /// let list: ObjectList<PartialObjectMeta<Pod>> = pods.list_metadata(&lp).await?;
211 /// for p in list {
212 /// println!("Found Pod: {}", p.name_any());
213 /// }
214 /// # Ok(())
215 /// # }
216 /// ```
217 pub async fn list_metadata(&self, lp: &ListParams) -> Result<ObjectList<PartialObjectMeta<K>>> {
218 let mut req = self.request.list_metadata(lp).map_err(Error::BuildRequest)?;
219 req.extensions_mut().insert("list_metadata");
220 self.client.request::<ObjectList<PartialObjectMeta<K>>>(req).await
221 }
222
223 /// Create a resource
224 ///
225 /// This function requires a type that Serializes to `K`, which can be:
226 /// 1. Raw string YAML
227 /// - easy to port from existing files
228 /// - error prone (run-time errors on typos due to failed serialize attempts)
229 /// - very error prone (can write invalid YAML)
230 /// 2. An instance of the struct itself
231 /// - easy to instantiate for CRDs (you define the struct)
232 /// - dense to instantiate for [`k8s_openapi`] types (due to many optionals)
233 /// - compile-time safety
234 /// - but still possible to write invalid native types (validation at apiserver)
235 /// 3. [`serde_json::json!`] macro instantiated [`serde_json::Value`]
236 /// - Tradeoff between the two
237 /// - Easy partially filling of native [`k8s_openapi`] types (most fields optional)
238 /// - Partial safety against runtime errors (at least you must write valid JSON)
239 ///
240 /// Note that this method cannot write to the status object (when it exists) of a resource.
241 /// To set status objects please see [`Api::replace_status`] or [`Api::patch_status`].
242 pub async fn create(&self, pp: &PostParams, data: &K) -> Result<K>
243 where
244 K: Serialize,
245 {
246 let bytes = serde_json::to_vec(&data).map_err(Error::SerdeError)?;
247 let mut req = self.request.create(pp, bytes).map_err(Error::BuildRequest)?;
248 req.extensions_mut().insert("create");
249 self.client.request::<K>(req).await
250 }
251
252 /// Delete a named resource
253 ///
254 /// When you get a `K` via `Left`, your delete has started.
255 /// When you get a `Status` via `Right`, this should be a a 2XX style
256 /// confirmation that the object being gone.
257 ///
258 /// 4XX and 5XX status types are returned as an [`Err(kube_client::Error::Api)`](crate::Error::Api).
259 ///
260 /// ```no_run
261 /// use kube::api::{Api, DeleteParams};
262 /// use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1 as apiexts;
263 /// use apiexts::CustomResourceDefinition;
264 /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
265 /// # let client: kube::Client = todo!();
266 /// let crds: Api<CustomResourceDefinition> = Api::all(client);
267 /// crds.delete("foos.clux.dev", &DeleteParams::default()).await?
268 /// .map_left(|o| println!("Deleting CRD: {:?}", o.status))
269 /// .map_right(|s| println!("Deleted CRD: {:?}", s));
270 /// # Ok(())
271 /// # }
272 /// ```
273 pub async fn delete(&self, name: &str, dp: &DeleteParams) -> Result<Either<K, Status>> {
274 let mut req = self.request.delete(name, dp).map_err(Error::BuildRequest)?;
275 req.extensions_mut().insert("delete");
276 self.client.request_status::<K>(req).await
277 }
278
279 /// Delete a collection of resources
280 ///
281 /// When you get an `ObjectList<K>` via `Left`, your delete has started.
282 /// When you get a `Status` via `Right`, this should be a a 2XX style
283 /// confirmation that the object being gone.
284 ///
285 /// 4XX and 5XX status types are returned as an [`Err(kube_client::Error::Api)`](crate::Error::Api).
286 ///
287 /// ```no_run
288 /// use kube::api::{Api, DeleteParams, ListParams, ResourceExt};
289 /// use k8s_openapi::api::core::v1::Pod;
290 /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
291 /// # let client: kube::Client = todo!();
292 ///
293 /// let pods: Api<Pod> = Api::namespaced(client, "apps");
294 /// match pods.delete_collection(&DeleteParams::default(), &ListParams::default()).await? {
295 /// either::Left(list) => {
296 /// let names: Vec<_> = list.iter().map(ResourceExt::name_any).collect();
297 /// println!("Deleting collection of pods: {:?}", names);
298 /// },
299 /// either::Right(status) => {
300 /// println!("Deleted collection of pods: status={:?}", status);
301 /// }
302 /// }
303 /// # Ok(())
304 /// # }
305 /// ```
306 pub async fn delete_collection(
307 &self,
308 dp: &DeleteParams,
309 lp: &ListParams,
310 ) -> Result<Either<ObjectList<K>, Status>> {
311 let mut req = self
312 .request
313 .delete_collection(dp, lp)
314 .map_err(Error::BuildRequest)?;
315 req.extensions_mut().insert("delete_collection");
316 self.client.request_status::<ObjectList<K>>(req).await
317 }
318
319 /// Patch a subset of a resource's properties
320 ///
321 /// Takes a [`Patch`] along with [`PatchParams`] for the call.
322 ///
323 /// ```no_run
324 /// use kube::api::{Api, PatchParams, Patch, Resource};
325 /// use k8s_openapi::api::core::v1::Pod;
326 /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
327 /// # let client: kube::Client = todo!();
328 ///
329 /// let pods: Api<Pod> = Api::namespaced(client, "apps");
330 /// let patch = serde_json::json!({
331 /// "apiVersion": "v1",
332 /// "kind": "Pod",
333 /// "metadata": {
334 /// "name": "blog"
335 /// },
336 /// "spec": {
337 /// "activeDeadlineSeconds": 5
338 /// }
339 /// });
340 /// let params = PatchParams::apply("myapp");
341 /// let patch = Patch::Apply(&patch);
342 /// let o_patched = pods.patch("blog", ¶ms, &patch).await?;
343 /// # Ok(())
344 /// # }
345 /// ```
346 /// [`Patch`]: super::Patch
347 /// [`PatchParams`]: super::PatchParams
348 ///
349 /// Note that this method cannot write to the status object (when it exists) of a resource.
350 /// To set status objects please see [`Api::replace_status`] or [`Api::patch_status`].
351 pub async fn patch<P: Serialize + Debug>(
352 &self,
353 name: &str,
354 pp: &PatchParams,
355 patch: &Patch<P>,
356 ) -> Result<K> {
357 let mut req = self.request.patch(name, pp, patch).map_err(Error::BuildRequest)?;
358 req.extensions_mut().insert("patch");
359 self.client.request::<K>(req).await
360 }
361
362 /// Patch a metadata subset of a resource's properties from [`PartialObjectMeta`]
363 ///
364 /// Takes a [`Patch`] along with [`PatchParams`] for the call.
365 /// Patches can be constructed raw using `serde_json::json!` or from `ObjectMeta` via [`PartialObjectMetaExt`].
366 ///
367 /// ```no_run
368 /// use kube::api::{Api, PatchParams, Patch, Resource};
369 /// use kube::core::{PartialObjectMetaExt, ObjectMeta};
370 /// use k8s_openapi::api::core::v1::Pod;
371 ///
372 /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
373 /// # let client: kube::Client = todo!();
374 /// let pods: Api<Pod> = Api::namespaced(client, "apps");
375 /// let metadata = ObjectMeta {
376 /// labels: Some([("key".to_string(), "value".to_string())].into()),
377 /// ..Default::default()
378 /// }.into_request_partial::<Pod>();
379 ///
380 /// let params = PatchParams::apply("myapp");
381 /// let o_patched = pods.patch_metadata("blog", ¶ms, &Patch::Apply(&metadata)).await?;
382 /// println!("Patched {}", o_patched.metadata.name.unwrap());
383 /// # Ok(())
384 /// # }
385 /// ```
386 /// [`Patch`]: super::Patch
387 /// [`PatchParams`]: super::PatchParams
388 /// [`PartialObjectMetaExt`]: crate::core::PartialObjectMetaExt
389 ///
390 /// ### Warnings
391 ///
392 /// The `TypeMeta` (apiVersion + kind) of a patch request (required for apply patches)
393 /// must match the underlying type that is being patched (e.g. "v1" + "Pod").
394 /// The returned `TypeMeta` will always be {"meta.k8s.io/v1", "PartialObjectMetadata"}.
395 /// These constraints are encoded into [`PartialObjectMetaExt`].
396 ///
397 /// This method can write to non-metadata fields such as spec if included in the patch.
398 pub async fn patch_metadata<P: Serialize + Debug>(
399 &self,
400 name: &str,
401 pp: &PatchParams,
402 patch: &Patch<P>,
403 ) -> Result<PartialObjectMeta<K>> {
404 let mut req = self
405 .request
406 .patch_metadata(name, pp, patch)
407 .map_err(Error::BuildRequest)?;
408 req.extensions_mut().insert("patch_metadata");
409 self.client.request::<PartialObjectMeta<K>>(req).await
410 }
411
412 /// Replace a resource entirely with a new one
413 ///
414 /// This is used just like [`Api::create`], but with one additional instruction:
415 /// You must set `metadata.resourceVersion` in the provided data because k8s
416 /// will not accept an update unless you actually knew what the last version was.
417 ///
418 /// Thus, to use this function, you need to do a `get` then a `replace` with its result.
419 ///
420 /// ```no_run
421 /// use kube::api::{Api, PostParams, ResourceExt};
422 /// use k8s_openapi::api::batch::v1::Job;
423 ///
424 /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
425 /// # let client: kube::Client = todo!();
426 /// let jobs: Api<Job> = Api::namespaced(client, "apps");
427 /// let j = jobs.get("baz").await?;
428 /// let j_new: Job = serde_json::from_value(serde_json::json!({
429 /// "apiVersion": "batch/v1",
430 /// "kind": "Job",
431 /// "metadata": {
432 /// "name": "baz",
433 /// "resourceVersion": j.resource_version(),
434 /// },
435 /// "spec": {
436 /// "template": {
437 /// "metadata": {
438 /// "name": "empty-job-pod"
439 /// },
440 /// "spec": {
441 /// "containers": [{
442 /// "name": "empty",
443 /// "image": "alpine:latest"
444 /// }],
445 /// "restartPolicy": "Never",
446 /// }
447 /// }
448 /// }
449 /// }))?;
450 /// jobs.replace("baz", &PostParams::default(), &j_new).await?;
451 /// # Ok(())
452 /// # }
453 /// ```
454 ///
455 /// Consider mutating the result of `api.get` rather than recreating it.
456 ///
457 /// Note that this method cannot write to the status object (when it exists) of a resource.
458 /// To set status objects please see [`Api::replace_status`] or [`Api::patch_status`].
459 pub async fn replace(&self, name: &str, pp: &PostParams, data: &K) -> Result<K>
460 where
461 K: Serialize,
462 {
463 let bytes = serde_json::to_vec(&data).map_err(Error::SerdeError)?;
464 let mut req = self
465 .request
466 .replace(name, pp, bytes)
467 .map_err(Error::BuildRequest)?;
468 req.extensions_mut().insert("replace");
469 self.client.request::<K>(req).await
470 }
471
472 /// Watch a list of resources
473 ///
474 /// This returns a future that awaits the initial response,
475 /// then you can stream the remaining buffered `WatchEvent` objects.
476 ///
477 /// Note that a `watch` call can terminate for many reasons (even before the specified
478 /// [`WatchParams::timeout`] is triggered), and will have to be re-issued
479 /// with the last seen resource version when or if it closes.
480 ///
481 /// Consider using a managed [`watcher`] to deal with automatic re-watches and error cases.
482 ///
483 /// ```no_run
484 /// use kube::api::{Api, WatchParams, ResourceExt, WatchEvent};
485 /// use k8s_openapi::api::batch::v1::Job;
486 /// use futures::{StreamExt, TryStreamExt};
487 ///
488 /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
489 /// # let client: kube::Client = todo!();
490 /// let jobs: Api<Job> = Api::namespaced(client, "apps");
491 /// let lp = WatchParams::default()
492 /// .fields("metadata.name=my_job")
493 /// .timeout(20); // upper bound of how long we watch for
494 /// let mut stream = jobs.watch(&lp, "0").await?.boxed();
495 /// while let Some(status) = stream.try_next().await? {
496 /// match status {
497 /// WatchEvent::Added(s) => println!("Added {}", s.name_any()),
498 /// WatchEvent::Modified(s) => println!("Modified: {}", s.name_any()),
499 /// WatchEvent::Deleted(s) => println!("Deleted {}", s.name_any()),
500 /// WatchEvent::Bookmark(s) => {},
501 /// WatchEvent::Error(s) => println!("{}", s),
502 /// }
503 /// }
504 /// # Ok(())
505 /// # }
506 /// ```
507 /// [`WatchParams::timeout`]: super::WatchParams::timeout
508 /// [`watcher`]: https://docs.rs/kube_runtime/*/kube_runtime/watcher/fn.watcher.html
509 pub async fn watch(
510 &self,
511 wp: &WatchParams,
512 version: &str,
513 ) -> Result<impl Stream<Item = Result<WatchEvent<K>>>> {
514 let mut req = self.request.watch(wp, version).map_err(Error::BuildRequest)?;
515 req.extensions_mut().insert("watch");
516 self.client.request_events::<K>(req).await
517 }
518
519 /// Watch a list of metadata for a given resources
520 ///
521 /// This returns a future that awaits the initial response,
522 /// then you can stream the remaining buffered `WatchEvent` objects.
523 ///
524 /// Note that a `watch_metadata` call can terminate for many reasons (even
525 /// before the specified [`WatchParams::timeout`] is triggered), and will
526 /// have to be re-issued with the last seen resource version when or if it
527 /// closes.
528 ///
529 /// Consider using a managed [`metadata_watcher`] to deal with automatic re-watches and error cases.
530 ///
531 /// ```no_run
532 /// use kube::api::{Api, WatchParams, ResourceExt, WatchEvent};
533 /// use k8s_openapi::api::batch::v1::Job;
534 /// use futures::{StreamExt, TryStreamExt};
535 ///
536 /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
537 /// # let client: kube::Client = todo!();
538 /// let jobs: Api<Job> = Api::namespaced(client, "apps");
539 ///
540 /// let lp = WatchParams::default()
541 /// .fields("metadata.name=my_job")
542 /// .timeout(20); // upper bound of how long we watch for
543 /// let mut stream = jobs.watch(&lp, "0").await?.boxed();
544 /// while let Some(status) = stream.try_next().await? {
545 /// match status {
546 /// WatchEvent::Added(s) => println!("Added {}", s.metadata.name.unwrap()),
547 /// WatchEvent::Modified(s) => println!("Modified: {}", s.metadata.name.unwrap()),
548 /// WatchEvent::Deleted(s) => println!("Deleted {}", s.metadata.name.unwrap()),
549 /// WatchEvent::Bookmark(s) => {},
550 /// WatchEvent::Error(s) => println!("{}", s),
551 /// }
552 /// }
553 /// # Ok(())
554 /// # }
555 /// ```
556 /// [`WatchParams::timeout`]: super::WatchParams::timeout
557 /// [`metadata_watcher`]: https://docs.rs/kube_runtime/*/kube_runtime/watcher/fn.metadata_watcher.html
558 pub async fn watch_metadata(
559 &self,
560 wp: &WatchParams,
561 version: &str,
562 ) -> Result<impl Stream<Item = Result<WatchEvent<PartialObjectMeta<K>>>>> {
563 let mut req = self
564 .request
565 .watch_metadata(wp, version)
566 .map_err(Error::BuildRequest)?;
567 req.extensions_mut().insert("watch_metadata");
568 self.client.request_events::<PartialObjectMeta<K>>(req).await
569 }
570}