rust_cutil/cutil/
message_center.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use axum::http::{HeaderMap, HeaderValue};
5use chrono::Local;
6use serde::{Deserialize, Serialize};
7use serde_json::{from_str, to_string, Value};
8use tracing::info;
9use utoipa::ToSchema;
10
11use crate::cutil::message_broker::{MessageBroker, MessageBrokerImpl, MessageBrokerOptions, Qos};
12use crate::cutil::meta::{Meta, R};
13use crate::cutil::{message_broker, message_center};
14use crate::meta;
15
16#[async_trait]
17pub trait MessageCenter: Send + Sync {
18  async fn subscribe(&self, topics: Vec<String>, qos: Qos) -> R<()>;
19
20  async fn unsubscribe(&self, topics: Vec<String>) -> R<()>;
21
22  async fn listen(&self, handler: Arc<dyn Fn(Message) -> R<()> + Send + Sync>) -> R<()>;
23
24  async fn shutdown(&self) -> R<()>;
25
26  async fn publish(&self, qos: Qos, retain: bool, message: Message) -> R<()>;
27
28  async fn publish_delay(&self, qos: Qos, retain: bool, message: Message) -> R<()>;
29}
30
31#[derive(Debug, Serialize, Deserialize, Clone, ToSchema)]
32pub struct Message {
33  pub id: String,
34  pub name: String,
35  pub created: i64,
36  pub arrival: i64,
37  pub body: Value,
38}
39
40impl Default for Message {
41  fn default() -> Self {
42    Self {
43      id: "".to_string(),
44      name: "".to_string(),
45      created: Local::now().timestamp(),
46      arrival: Local::now().timestamp(),
47      body: Default::default(),
48    }
49  }
50}
51
52#[derive(Debug, Serialize, Deserialize, Clone, ToSchema)]
53pub struct MessageCenterOptions {
54  pub publish_url: String,
55  pub publish_token: String,
56}
57
58pub struct MessageCenterImpl {
59  broker: Arc<dyn MessageBroker>,
60  options: MessageCenterOptions,
61}
62
63impl MessageCenterImpl {
64  pub fn new(options: MessageCenterOptions, message_broker_options: MessageBrokerOptions) -> R<Self> {
65    let broker = MessageBrokerImpl::new(message_broker_options)?;
66    Ok(Self {
67      broker: Arc::new(broker),
68      options,
69    })
70  }
71}
72
73#[async_trait]
74impl MessageCenter for MessageCenterImpl {
75  async fn subscribe(&self, topics: Vec<String>, qos: Qos) -> R<()> {
76    self.broker.subscribe(topics, qos).await
77  }
78
79  async fn unsubscribe(&self, topics: Vec<String>) -> R<()> {
80    self.broker.unsubscribe(topics).await
81  }
82
83  async fn listen(&self, handler: Arc<dyn Fn(message_center::Message) -> R<()> + Send + Sync>) -> R<()> {
84    let wrapped_handler = Arc::new(move |message_b: message_broker::Message| -> R<()> {
85      if let Ok(mut message) = from_str::<message_center::Message>(&message_b.body) {
86        handler(message)?;
87      }
88      Ok(())
89    });
90
91    self.broker.listen(wrapped_handler).await
92  }
93
94  async fn shutdown(&self) -> R<()> {
95    self.broker.shutdown().await
96  }
97
98  async fn publish(&self, qos: Qos, retain: bool, message: message_center::Message) -> R<()> {
99    let message_b = message_broker::Message {
100      name: message.name.clone(),
101      qos,
102      retain,
103      body: to_string(&message)?,
104    };
105    self.broker.publish(message_b).await
106  }
107
108  async fn publish_delay(&self, qos: Qos, retain: bool, message: message_center::Message) -> R<()> {
109    let mut headers = HeaderMap::new();
110    headers.insert(
111      "Authorization",
112      format!("token {}", self.options.publish_token.clone()).parse::<HeaderValue>()?,
113    );
114
115    let message_json = to_string(&message)?;
116    info!("publish message: {}", message_json);
117
118    let url = format!("{}?qos={:?}&retain={}", self.options.publish_url.clone(), qos, retain);
119    let client = reqwest::Client::new();
120    let res = client.post(url).headers(headers).json(&message).send().await?;
121    if !res.status().is_success() {
122      return meta!("publish_failed");
123    }
124    Ok(())
125  }
126}