async_nats/service/
mod.rs

1// Copyright 2020-2023 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14pub mod error;
15
16use std::{
17    collections::HashMap,
18    fmt::Display,
19    pin::Pin,
20    sync::{Arc, Mutex},
21    time::{Duration, Instant},
22};
23
24use bytes::Bytes;
25pub mod endpoint;
26use futures::{
27    stream::{self, SelectAll},
28    Future, StreamExt,
29};
30use once_cell::sync::Lazy;
31use regex::Regex;
32use serde::{Deserialize, Serialize};
33use time::serde::rfc3339;
34use time::OffsetDateTime;
35use tokio::{sync::broadcast::Sender, task::JoinHandle};
36use tracing::debug;
37
38use crate::{Client, Error, HeaderMap, Message, PublishError, Subscriber};
39
40use self::endpoint::Endpoint;
41
42const SERVICE_API_PREFIX: &str = "$SRV";
43const DEFAULT_QUEUE_GROUP: &str = "q";
44pub const NATS_SERVICE_ERROR: &str = "Nats-Service-Error";
45pub const NATS_SERVICE_ERROR_CODE: &str = "Nats-Service-Error-Code";
46
47// uses recommended semver validation expression from
48// https://semver.org/#is-there-a-suggested-regular-expression-regex-to-check-a-semver-string
49static SEMVER: Lazy<Regex> = Lazy::new(|| {
50    Regex::new(r"^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(?:-((?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$")
51        .unwrap()
52});
53// From ADR-33: Name can only have A-Z, a-z, 0-9, dash, underscore.
54static NAME: Lazy<Regex> = Lazy::new(|| Regex::new(r"^[A-Za-z0-9\-_]+$").unwrap());
55
56/// Represents state for all endpoints.
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub(crate) struct Endpoints {
59    pub(crate) endpoints: HashMap<String, endpoint::Inner>,
60}
61
62/// Response for `PING` requests.
63#[derive(Serialize, Deserialize)]
64pub struct PingResponse {
65    /// Response type.
66    #[serde(rename = "type")]
67    pub kind: String,
68    /// Service name.
69    pub name: String,
70    /// Service id.
71    pub id: String,
72    /// Service version.
73    pub version: String,
74    /// Additional metadata
75    #[serde(default, deserialize_with = "endpoint::null_meta_as_default")]
76    pub metadata: HashMap<String, String>,
77}
78
79/// Response for `STATS` requests.
80#[derive(Serialize, Deserialize)]
81pub struct Stats {
82    /// Response type.
83    #[serde(rename = "type")]
84    pub kind: String,
85    /// Service name.
86    pub name: String,
87    /// Service id.
88    pub id: String,
89    // Service version.
90    pub version: String,
91    #[serde(with = "rfc3339")]
92    pub started: OffsetDateTime,
93    /// Statistics of all endpoints.
94    pub endpoints: Vec<endpoint::Stats>,
95}
96
97/// Information about service instance.
98/// Service name.
99#[derive(Serialize, Deserialize, Debug, Clone)]
100pub struct Info {
101    /// Response type.
102    #[serde(rename = "type")]
103    pub kind: String,
104    /// Service name.
105    pub name: String,
106    /// Service id.
107    pub id: String,
108    /// Service description.
109    pub description: String,
110    /// Service version.
111    pub version: String,
112    /// Additional metadata
113    #[serde(default, deserialize_with = "endpoint::null_meta_as_default")]
114    pub metadata: HashMap<String, String>,
115    /// Info about all service endpoints.
116    pub endpoints: Vec<endpoint::Info>,
117}
118
119/// Configuration of the [Service].
120#[derive(Serialize, Deserialize, Debug)]
121pub struct Config {
122    /// Really the kind of the service. Shared by all the services that have the same name.
123    /// This name can only have A-Z, a-z, 0-9, dash, underscore
124    pub name: String,
125    /// a human-readable description about the service
126    pub description: Option<String>,
127    /// A SemVer valid service version.
128    pub version: String,
129    /// Custom handler for providing the `EndpointStats.data` value.
130    #[serde(skip)]
131    pub stats_handler: Option<StatsHandler>,
132    /// Additional service metadata
133    pub metadata: Option<HashMap<String, String>>,
134    /// Custom queue group config
135    pub queue_group: Option<String>,
136}
137
138pub struct ServiceBuilder {
139    client: Client,
140    description: Option<String>,
141    stats_handler: Option<StatsHandler>,
142    metadata: Option<HashMap<String, String>>,
143    queue_group: Option<String>,
144}
145
146impl ServiceBuilder {
147    fn new(client: Client) -> Self {
148        Self {
149            client,
150            description: None,
151            stats_handler: None,
152            metadata: None,
153            queue_group: None,
154        }
155    }
156
157    /// Description for the service.
158    pub fn description<S: ToString>(mut self, description: S) -> Self {
159        self.description = Some(description.to_string());
160        self
161    }
162
163    /// Handler for custom service statistics.
164    pub fn stats_handler<F>(mut self, handler: F) -> Self
165    where
166        F: FnMut(String, endpoint::Stats) -> serde_json::Value + Send + Sync + 'static,
167    {
168        self.stats_handler = Some(StatsHandler(Box::new(handler)));
169        self
170    }
171
172    /// Additional service metadata.
173    pub fn metadata(mut self, metadata: HashMap<String, String>) -> Self {
174        self.metadata = Some(metadata);
175        self
176    }
177
178    /// Custom queue group. Default is `q`.
179    pub fn queue_group<S: ToString>(mut self, queue_group: S) -> Self {
180        self.queue_group = Some(queue_group.to_string());
181        self
182    }
183
184    /// Starts the service with configured options.
185    pub async fn start<S: ToString>(self, name: S, version: S) -> Result<Service, Error> {
186        Service::add(
187            self.client,
188            Config {
189                name: name.to_string(),
190                version: version.to_string(),
191                description: self.description,
192                stats_handler: self.stats_handler,
193                metadata: self.metadata,
194                queue_group: self.queue_group,
195            },
196        )
197        .await
198    }
199}
200
201/// Verbs that can be used to acquire information from the services.
202pub enum Verb {
203    Ping,
204    Stats,
205    Info,
206    Schema,
207}
208
209impl Display for Verb {
210    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211        match self {
212            Verb::Ping => write!(f, "PING"),
213            Verb::Stats => write!(f, "STATS"),
214            Verb::Info => write!(f, "INFO"),
215            Verb::Schema => write!(f, "SCHEMA"),
216        }
217    }
218}
219
220pub trait ServiceExt {
221    type Output: Future<Output = Result<Service, crate::Error>>;
222
223    /// Adds a Service instance.
224    ///
225    /// # Examples
226    ///
227    /// ```no_run
228    /// # #[tokio::main]
229    /// # async fn main() -> Result<(), async_nats::Error> {
230    /// use async_nats::service::ServiceExt;
231    /// use futures::StreamExt;
232    /// let client = async_nats::connect("demo.nats.io").await?;
233    /// let mut service = client
234    ///     .add_service(async_nats::service::Config {
235    ///         name: "generator".to_string(),
236    ///         version: "1.0.0".to_string(),
237    ///         description: None,
238    ///         stats_handler: None,
239    ///         metadata: None,
240    ///         queue_group: None,
241    ///     })
242    ///     .await?;
243    ///
244    /// let mut endpoint = service.endpoint("get").await?;
245    ///
246    /// if let Some(request) = endpoint.next().await {
247    ///     request.respond(Ok("hello".into())).await?;
248    /// }
249    ///
250    /// # Ok(())
251    /// # }
252    /// ```
253    fn add_service(&self, config: Config) -> Self::Output;
254
255    /// Returns Service instance builder.
256    ///
257    /// # Examples
258    ///
259    /// ```no_run
260    /// # #[tokio::main]
261    /// # async fn main() -> Result<(), async_nats::Error> {
262    /// use async_nats::service::ServiceExt;
263    /// use futures::StreamExt;
264    /// let client = async_nats::connect("demo.nats.io").await?;
265    /// let mut service = client
266    ///     .service_builder()
267    ///     .description("some service")
268    ///     .stats_handler(|endpoint, stats| serde_json::json!({ "endpoint": endpoint }))
269    ///     .start("products", "1.0.0")
270    ///     .await?;
271    ///
272    /// let mut endpoint = service.endpoint("get").await?;
273    ///
274    /// if let Some(request) = endpoint.next().await {
275    ///     request.respond(Ok("hello".into())).await?;
276    /// }
277    /// # Ok(())
278    /// # }
279    /// ```
280    fn service_builder(&self) -> ServiceBuilder;
281}
282
283impl ServiceExt for crate::Client {
284    type Output = Pin<Box<dyn Future<Output = Result<Service, crate::Error>> + Send>>;
285
286    fn add_service(&self, config: Config) -> Self::Output {
287        let client = self.clone();
288        Box::pin(async { Service::add(client, config).await })
289    }
290
291    fn service_builder(&self) -> ServiceBuilder {
292        ServiceBuilder::new(self.clone())
293    }
294}
295
296/// Service instance.
297///
298/// # Examples
299///
300/// ```no_run
301/// # #[tokio::main]
302/// # async fn main() -> Result<(), async_nats::Error> {
303/// use async_nats::service::ServiceExt;
304/// use futures::StreamExt;
305/// let client = async_nats::connect("demo.nats.io").await?;
306/// let mut service = client.service_builder().start("generator", "1.0.0").await?;
307/// let mut endpoint = service.endpoint("get").await?;
308///
309/// if let Some(request) = endpoint.next().await {
310///     request.respond(Ok("hello".into())).await?;
311/// }
312///
313/// # Ok(())
314/// # }
315/// ```
316#[derive(Debug)]
317pub struct Service {
318    endpoints_state: Arc<Mutex<Endpoints>>,
319    info: Info,
320    client: Client,
321    handle: JoinHandle<Result<(), Error>>,
322    shutdown_tx: tokio::sync::broadcast::Sender<()>,
323    subjects: Arc<Mutex<Vec<String>>>,
324    queue_group: String,
325}
326
327impl Service {
328    async fn add(client: Client, config: Config) -> Result<Service, Error> {
329        // validate service version semver string.
330        if !SEMVER.is_match(config.version.as_str()) {
331            return Err(Box::new(std::io::Error::new(
332                std::io::ErrorKind::InvalidInput,
333                "service version is not a valid semver string",
334            )));
335        }
336        // validate service name.
337        if !NAME.is_match(config.name.as_str()) {
338            return Err(Box::new(std::io::Error::new(
339                std::io::ErrorKind::InvalidInput,
340                "service name is not a valid string (only A-Z, a-z, 0-9, _, - are allowed)",
341            )));
342        }
343        let endpoints_state = Arc::new(Mutex::new(Endpoints {
344            endpoints: HashMap::new(),
345        }));
346
347        let queue_group = config
348            .queue_group
349            .unwrap_or(DEFAULT_QUEUE_GROUP.to_string());
350        let id = nuid::next().to_string();
351        let started = time::OffsetDateTime::now_utc();
352        let subjects = Arc::new(Mutex::new(Vec::new()));
353        let info = Info {
354            kind: "io.nats.micro.v1.info_response".to_string(),
355            name: config.name.clone(),
356            id: id.clone(),
357            description: config.description.clone().unwrap_or_default(),
358            version: config.version.clone(),
359            metadata: config.metadata.clone().unwrap_or_default(),
360            endpoints: Vec::new(),
361        };
362
363        let (shutdown_tx, _) = tokio::sync::broadcast::channel(1);
364
365        // create subscriptions for all verbs.
366        let mut pings =
367            verb_subscription(client.clone(), Verb::Ping, config.name.clone(), id.clone()).await?;
368        let mut infos =
369            verb_subscription(client.clone(), Verb::Info, config.name.clone(), id.clone()).await?;
370        let mut stats =
371            verb_subscription(client.clone(), Verb::Stats, config.name.clone(), id.clone()).await?;
372
373        // Start a task for handling verbs subscriptions.
374        let handle = tokio::task::spawn({
375            let mut stats_callback = config.stats_handler;
376            let info = info.clone();
377            let endpoints_state = endpoints_state.clone();
378            let client = client.clone();
379            async move {
380                loop {
381                    tokio::select! {
382                        Some(ping) = pings.next() => {
383                            let pong = serde_json::to_vec(&PingResponse{
384                                kind: "io.nats.micro.v1.ping_response".to_string(),
385                                name: info.name.clone(),
386                                id: info.id.clone(),
387                                version: info.version.clone(),
388                                metadata: info.metadata.clone(),
389                            })?;
390                            client.publish(ping.reply.unwrap(), pong.into()).await?;
391                        },
392                        Some(info_request) = infos.next() => {
393                            let info = info.clone();
394
395                            let endpoints: Vec<endpoint::Info> = {
396                                endpoints_state.lock().unwrap().endpoints.values().map(|value| {
397                                    endpoint::Info {
398                                        name: value.name.to_owned(),
399                                        subject: value.subject.to_owned(),
400                                        queue_group: value.queue_group.to_owned(),
401                                        metadata: value.metadata.to_owned()
402                                    }
403                                }).collect()
404                            };
405                            let info = Info {
406                                endpoints,
407                                ..info
408                            };
409                            let info_json = serde_json::to_vec(&info).map(Bytes::from)?;
410                            client.publish(info_request.reply.unwrap(), info_json.clone()).await?;
411                        },
412                        Some(stats_request) = stats.next() => {
413                            if let Some(stats_callback) = stats_callback.as_mut() {
414                                let mut endpoint_stats_locked = endpoints_state.lock().unwrap();
415                                for (key, value) in &mut endpoint_stats_locked.endpoints {
416                                    let data = stats_callback.0(key.to_string(), value.clone().into());
417                                    value.data = Some(data);
418                                }
419                            }
420                            let stats = serde_json::to_vec(&Stats {
421                                kind: "io.nats.micro.v1.stats_response".to_string(),
422                                name: info.name.clone(),
423                                id: info.id.clone(),
424                                version: info.version.clone(),
425                                started,
426                                endpoints: endpoints_state.lock().unwrap().endpoints.values().cloned().map(Into::into).collect(),
427                            })?;
428                            client.publish(stats_request.reply.unwrap(), stats.into()).await?;
429                        },
430                        else => break,
431                    }
432                }
433                Ok(())
434            }
435        });
436        Ok(Service {
437            endpoints_state,
438            info,
439            client,
440            handle,
441            shutdown_tx,
442            subjects,
443            queue_group,
444        })
445    }
446    /// Stops this instance of the [Service].
447    /// If there are more instances of [Services][Service] with the same name, the [Service] will
448    /// be scaled down by one instance. If it was the only running instance, it will effectively
449    /// remove the service entirely.
450    pub async fn stop(self) -> Result<(), Error> {
451        self.shutdown_tx.send(())?;
452        self.handle.abort();
453        Ok(())
454    }
455
456    /// Resets [Stats] of the [Service] instance.
457    pub async fn reset(&mut self) {
458        for value in self.endpoints_state.lock().unwrap().endpoints.values_mut() {
459            value.errors = 0;
460            value.processing_time = Duration::default();
461            value.requests = 0;
462            value.average_processing_time = Duration::default();
463        }
464    }
465
466    /// Returns [Stats] for this service instance.
467    pub async fn stats(&self) -> HashMap<String, endpoint::Stats> {
468        self.endpoints_state
469            .lock()
470            .unwrap()
471            .endpoints
472            .iter()
473            .map(|(key, value)| (key.to_owned(), value.to_owned().into()))
474            .collect()
475    }
476
477    /// Returns [Info] for this service instance.
478    pub async fn info(&self) -> Info {
479        self.info.clone()
480    }
481
482    /// Creates a group for endpoints under common prefix.
483    ///
484    /// # Examples
485    ///
486    /// ```no_run
487    /// # #[tokio::main]
488    /// # async fn main() -> Result<(), async_nats::Error> {
489    /// use async_nats::service::ServiceExt;
490    /// let client = async_nats::connect("demo.nats.io").await?;
491    /// let mut service = client.service_builder().start("service", "1.0.0").await?;
492    ///
493    /// let v1 = service.group("v1");
494    /// let products = v1.endpoint("products").await?;
495    /// # Ok(())
496    /// # }
497    /// ```
498    pub fn group<S: ToString>(&self, prefix: S) -> Group {
499        self.group_with_queue_group(prefix, self.queue_group.clone())
500    }
501
502    /// Creates a group for endpoints under common prefix with custom queue group.
503    ///
504    /// # Examples
505    ///
506    /// ```no_run
507    /// # #[tokio::main]
508    /// # async fn main() -> Result<(), async_nats::Error> {
509    /// use async_nats::service::ServiceExt;
510    /// let client = async_nats::connect("demo.nats.io").await?;
511    /// let mut service = client.service_builder().start("service", "1.0.0").await?;
512    ///
513    /// let v1 = service.group("v1");
514    /// let products = v1.endpoint("products").await?;
515    /// # Ok(())
516    /// # }
517    /// ```
518    pub fn group_with_queue_group<S: ToString, Z: ToString>(
519        &self,
520        prefix: S,
521        queue_group: Z,
522    ) -> Group {
523        Group {
524            subjects: self.subjects.clone(),
525            prefix: prefix.to_string(),
526            stats: self.endpoints_state.clone(),
527            client: self.client.clone(),
528            shutdown_tx: self.shutdown_tx.clone(),
529            queue_group: queue_group.to_string(),
530        }
531    }
532
533    /// Builder for customized [Endpoint] creation.
534    ///
535    /// # Examples
536    ///
537    /// ```no_run
538    /// # #[tokio::main]
539    /// # async fn main() -> Result<(), async_nats::Error> {
540    /// use async_nats::service::ServiceExt;
541    /// let client = async_nats::connect("demo.nats.io").await?;
542    /// let mut service = client.service_builder().start("service", "1.0.0").await?;
543    ///
544    /// let products = service
545    ///     .endpoint_builder()
546    ///     .name("api")
547    ///     .add("products")
548    ///     .await?;
549    /// # Ok(())
550    /// # }
551    /// ```
552    pub fn endpoint_builder(&self) -> EndpointBuilder {
553        EndpointBuilder::new(
554            self.client.clone(),
555            self.endpoints_state.clone(),
556            self.shutdown_tx.clone(),
557            self.subjects.clone(),
558            self.queue_group.clone(),
559        )
560    }
561
562    /// Adds a new endpoint to the [Service].
563    ///
564    /// # Examples
565    ///
566    /// ```no_run
567    /// # #[tokio::main]
568    /// # async fn main() -> Result<(), async_nats::Error> {
569    /// use async_nats::service::ServiceExt;
570    /// let client = async_nats::connect("demo.nats.io").await?;
571    /// let mut service = client.service_builder().start("service", "1.0.0").await?;
572    ///
573    /// let products = service.endpoint("products").await?;
574    /// # Ok(())
575    /// # }
576    /// ```
577    pub async fn endpoint<S: ToString>(&self, subject: S) -> Result<Endpoint, Error> {
578        EndpointBuilder::new(
579            self.client.clone(),
580            self.endpoints_state.clone(),
581            self.shutdown_tx.clone(),
582            self.subjects.clone(),
583            self.queue_group.clone(),
584        )
585        .add(subject)
586        .await
587    }
588}
589
590pub struct Group {
591    prefix: String,
592    stats: Arc<Mutex<Endpoints>>,
593    client: Client,
594    shutdown_tx: tokio::sync::broadcast::Sender<()>,
595    subjects: Arc<Mutex<Vec<String>>>,
596    queue_group: String,
597}
598
599impl Group {
600    /// Creates a group for [Endpoints][Endpoint] under common prefix.
601    ///
602    /// # Examples
603    ///
604    /// ```no_run
605    /// # #[tokio::main]
606    /// # async fn main() -> Result<(), async_nats::Error> {
607    /// use async_nats::service::ServiceExt;
608    /// let client = async_nats::connect("demo.nats.io").await?;
609    /// let mut service = client.service_builder().start("service", "1.0.0").await?;
610    ///
611    /// let v1 = service.group("v1");
612    /// let products = v1.endpoint("products").await?;
613    /// # Ok(())
614    /// # }
615    /// ```
616    pub fn group<S: ToString>(&self, prefix: S) -> Group {
617        self.group_with_queue_group(prefix, self.queue_group.clone())
618    }
619
620    /// Creates a group for [Endpoints][Endpoint] under common prefix with custom queue group.
621    ///
622    /// # Examples
623    ///
624    /// ```no_run
625    /// # #[tokio::main]
626    /// # async fn main() -> Result<(), async_nats::Error> {
627    /// use async_nats::service::ServiceExt;
628    /// let client = async_nats::connect("demo.nats.io").await?;
629    /// let mut service = client.service_builder().start("service", "1.0.0").await?;
630    ///
631    /// let v1 = service.group("v1");
632    /// let products = v1.endpoint("products").await?;
633    /// # Ok(())
634    /// # }
635    /// ```
636    pub fn group_with_queue_group<S: ToString, Z: ToString>(
637        &self,
638        prefix: S,
639        queue_group: Z,
640    ) -> Group {
641        Group {
642            prefix: format!("{}.{}", self.prefix, prefix.to_string()),
643            stats: self.stats.clone(),
644            client: self.client.clone(),
645            shutdown_tx: self.shutdown_tx.clone(),
646            subjects: self.subjects.clone(),
647            queue_group: queue_group.to_string(),
648        }
649    }
650
651    /// Adds a new endpoint to the [Service] under current [Group]
652    ///
653    /// # Examples
654    ///
655    /// ```no_run
656    /// # #[tokio::main]
657    /// # async fn main() -> Result<(), async_nats::Error> {
658    /// use async_nats::service::ServiceExt;
659    /// let client = async_nats::connect("demo.nats.io").await?;
660    /// let mut service = client.service_builder().start("service", "1.0.0").await?;
661    /// let v1 = service.group("v1");
662    ///
663    /// let products = v1.endpoint("products").await?;
664    /// # Ok(())
665    /// # }
666    /// ```
667    pub async fn endpoint<S: ToString>(&self, subject: S) -> Result<Endpoint, Error> {
668        let mut endpoint = EndpointBuilder::new(
669            self.client.clone(),
670            self.stats.clone(),
671            self.shutdown_tx.clone(),
672            self.subjects.clone(),
673            self.queue_group.clone(),
674        );
675        endpoint.prefix = Some(self.prefix.clone());
676        endpoint.add(subject.to_string()).await
677    }
678
679    /// Builder for customized [Endpoint] creation under current [Group]
680    ///
681    /// # Examples
682    ///
683    /// ```no_run
684    /// # #[tokio::main]
685    /// # async fn main() -> Result<(), async_nats::Error> {
686    /// use async_nats::service::ServiceExt;
687    /// let client = async_nats::connect("demo.nats.io").await?;
688    /// let mut service = client.service_builder().start("service", "1.0.0").await?;
689    /// let v1 = service.group("v1");
690    ///
691    /// let products = v1.endpoint_builder().name("api").add("products").await?;
692    /// # Ok(())
693    /// # }
694    /// ```
695    pub fn endpoint_builder(&self) -> EndpointBuilder {
696        let mut endpoint = EndpointBuilder::new(
697            self.client.clone(),
698            self.stats.clone(),
699            self.shutdown_tx.clone(),
700            self.subjects.clone(),
701            self.queue_group.clone(),
702        );
703        endpoint.prefix = Some(self.prefix.clone());
704        endpoint
705    }
706}
707
708async fn verb_subscription(
709    client: Client,
710    verb: Verb,
711    name: String,
712    id: String,
713) -> Result<futures::stream::Fuse<SelectAll<Subscriber>>, Error> {
714    let verb_all = client
715        .subscribe(format!("{SERVICE_API_PREFIX}.{verb}"))
716        .await?;
717    let verb_name = client
718        .subscribe(format!("{SERVICE_API_PREFIX}.{verb}.{name}"))
719        .await?;
720    let verb_id = client
721        .subscribe(format!("{SERVICE_API_PREFIX}.{verb}.{name}.{id}"))
722        .await?;
723    Ok(stream::select_all([verb_all, verb_id, verb_name]).fuse())
724}
725
726type ShutdownReceiverFuture = Pin<
727    Box<dyn Future<Output = Result<(), tokio::sync::broadcast::error::RecvError>> + Send + Sync>,
728>;
729
730/// Request returned by [Service] [Stream][futures::Stream].
731#[derive(Debug)]
732pub struct Request {
733    issued: Instant,
734    client: Client,
735    pub message: Message,
736    endpoint: String,
737    stats: Arc<Mutex<Endpoints>>,
738}
739
740impl Request {
741    /// Sends response for the request.
742    ///
743    /// # Examples
744    ///
745    /// ```no_run
746    /// # #[tokio::main]
747    /// # async fn main() -> Result<(), async_nats::Error> {
748    /// use async_nats::service::ServiceExt;
749    /// use futures::StreamExt;
750    /// # let client = async_nats::connect("demo.nats.io").await?;
751    /// # let mut service = client
752    /// #    .service_builder().start("serviceA", "1.0.0.1").await?;
753    /// let mut endpoint = service.endpoint("endpoint").await?;
754    /// let request = endpoint.next().await.unwrap();
755    /// request.respond(Ok("hello".into())).await?;
756    /// # Ok(())
757    /// # }
758    /// ```
759    pub async fn respond(&self, response: Result<Bytes, error::Error>) -> Result<(), PublishError> {
760        let reply = self.message.reply.clone().unwrap();
761        let result = match response {
762            Ok(payload) => self.client.publish(reply, payload).await,
763            Err(err) => {
764                self.stats
765                    .lock()
766                    .unwrap()
767                    .endpoints
768                    .entry(self.endpoint.clone())
769                    .and_modify(|stats| {
770                        stats.last_error = Some(err.clone());
771                        stats.errors += 1;
772                    })
773                    .or_default();
774                let mut headers = HeaderMap::new();
775                headers.insert(NATS_SERVICE_ERROR, err.status.as_str());
776                headers.insert(NATS_SERVICE_ERROR_CODE, err.code.to_string().as_str());
777                self.client
778                    .publish_with_headers(reply, headers, "".into())
779                    .await
780            }
781        };
782        let elapsed = self.issued.elapsed();
783        let mut stats = self.stats.lock().unwrap();
784        let stats = stats.endpoints.get_mut(self.endpoint.as_str()).unwrap();
785        stats.requests += 1;
786        stats.processing_time += elapsed;
787        stats.average_processing_time = {
788            let avg_nanos = (stats.processing_time.as_nanos() / stats.requests as u128) as u64;
789            Duration::from_nanos(avg_nanos)
790        };
791        result
792    }
793}
794
795#[derive(Debug)]
796pub struct EndpointBuilder {
797    client: Client,
798    stats: Arc<Mutex<Endpoints>>,
799    shutdown_tx: Sender<()>,
800    name: Option<String>,
801    metadata: Option<HashMap<String, String>>,
802    subjects: Arc<Mutex<Vec<String>>>,
803    queue_group: String,
804    prefix: Option<String>,
805}
806
807impl EndpointBuilder {
808    fn new(
809        client: Client,
810        stats: Arc<Mutex<Endpoints>>,
811        shutdown_tx: Sender<()>,
812        subjects: Arc<Mutex<Vec<String>>>,
813        queue_group: String,
814    ) -> EndpointBuilder {
815        EndpointBuilder {
816            client,
817            stats,
818            subjects,
819            shutdown_tx,
820            name: None,
821            metadata: None,
822            queue_group,
823            prefix: None,
824        }
825    }
826
827    /// Name of the [Endpoint]. By default subject of the endpoint is used.
828    pub fn name<S: ToString>(mut self, name: S) -> EndpointBuilder {
829        self.name = Some(name.to_string());
830        self
831    }
832
833    /// Metadata specific for the [Endpoint].
834    pub fn metadata(mut self, metadata: HashMap<String, String>) -> EndpointBuilder {
835        self.metadata = Some(metadata);
836        self
837    }
838
839    /// Custom queue group for the [Endpoint]. Otherwise it will be derived from group or service.
840    pub fn queue_group<S: ToString>(mut self, queue_group: S) -> EndpointBuilder {
841        self.queue_group = queue_group.to_string();
842        self
843    }
844
845    /// Finalizes the builder and adds the [Endpoint].
846    pub async fn add<S: ToString>(self, subject: S) -> Result<Endpoint, Error> {
847        let mut subject = subject.to_string();
848        if let Some(prefix) = self.prefix {
849            subject = format!("{}.{}", prefix, subject);
850        }
851        let endpoint_name = self.name.clone().unwrap_or_else(|| subject.clone());
852        let name = self
853            .name
854            .clone()
855            .unwrap_or_else(|| subject.clone().replace('.', "-"));
856        let requests = self
857            .client
858            .queue_subscribe(subject.to_owned(), self.queue_group.to_string())
859            .await?;
860        debug!("created service for endpoint {subject}");
861
862        let shutdown_rx = self.shutdown_tx.subscribe();
863
864        let mut stats = self.stats.lock().unwrap();
865        stats
866            .endpoints
867            .entry(endpoint_name.clone())
868            .or_insert(endpoint::Inner {
869                name,
870                subject: subject.clone(),
871                metadata: self.metadata.unwrap_or_default(),
872                queue_group: self.queue_group.clone(),
873                ..Default::default()
874            });
875        self.subjects.lock().unwrap().push(subject.clone());
876        Ok(Endpoint {
877            requests,
878            stats: self.stats.clone(),
879            client: self.client.clone(),
880            endpoint: endpoint_name,
881            shutdown: Some(shutdown_rx),
882            shutdown_future: None,
883        })
884    }
885}
886
887pub struct StatsHandler(pub Box<dyn FnMut(String, endpoint::Stats) -> serde_json::Value + Send>);
888
889impl std::fmt::Debug for StatsHandler {
890    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
891        write!(f, "Stats handler")
892    }
893}
894
895#[cfg(test)]
896mod tests {
897    use super::*;
898
899    #[tokio::test]
900    async fn test_group_with_queue_group() {
901        let server = nats_server::run_basic_server();
902        let client = crate::connect(server.client_url()).await.unwrap();
903
904        let group = Group {
905            prefix: "test".to_string(),
906            stats: Arc::new(Mutex::new(Endpoints {
907                endpoints: HashMap::new(),
908            })),
909            client,
910            shutdown_tx: tokio::sync::broadcast::channel(1).0,
911            subjects: Arc::new(Mutex::new(vec![])),
912            queue_group: "default".to_string(),
913        };
914
915        let new_group = group.group_with_queue_group("v1", "custom_queue");
916
917        assert_eq!(new_group.prefix, "test.v1");
918        assert_eq!(new_group.queue_group, "custom_queue");
919    }
920}