rust_cutil/cutil/
message_center.rs1use std::sync::Arc;
2
3use async_trait::async_trait;
4use axum::http::{HeaderMap, HeaderValue};
5use chrono::Local;
6use serde::{Deserialize, Serialize};
7use serde_json::{from_str, to_string, Value};
8use tracing::info;
9use utoipa::ToSchema;
10
11use crate::cutil::message_broker::{MessageBroker, MessageBrokerImpl, MessageBrokerOptions, Qos};
12use crate::cutil::meta::{Meta, R};
13use crate::cutil::{message_broker, message_center};
14use crate::meta;
15
16#[async_trait]
17pub trait MessageCenter: Send + Sync {
18 async fn subscribe(&self, topics: Vec<String>, qos: Qos) -> R<()>;
19
20 async fn unsubscribe(&self, topics: Vec<String>) -> R<()>;
21
22 async fn listen(&self, handler: Arc<dyn Fn(Message) -> R<()> + Send + Sync>) -> R<()>;
23
24 async fn shutdown(&self) -> R<()>;
25
26 async fn publish(&self, qos: Qos, retain: bool, message: Message) -> R<()>;
27
28 async fn publish_delay(&self, qos: Qos, retain: bool, message: Message) -> R<()>;
29}
30
31#[derive(Debug, Serialize, Deserialize, Clone, ToSchema)]
32pub struct Message {
33 pub id: String,
34 pub name: String,
35 pub created: i64,
36 pub arrival: i64,
37 pub body: Value,
38}
39
40impl Default for Message {
41 fn default() -> Self {
42 Self {
43 id: "".to_string(),
44 name: "".to_string(),
45 created: Local::now().timestamp(),
46 arrival: Local::now().timestamp(),
47 body: Default::default(),
48 }
49 }
50}
51
52#[derive(Debug, Serialize, Deserialize, Clone, ToSchema)]
53pub struct MessageCenterOptions {
54 pub publish_url: String,
55 pub publish_token: String,
56}
57
58pub struct MessageCenterImpl {
59 broker: Arc<dyn MessageBroker>,
60 options: MessageCenterOptions,
61}
62
63impl MessageCenterImpl {
64 pub fn new(options: MessageCenterOptions, message_broker_options: MessageBrokerOptions) -> R<Self> {
65 let broker = MessageBrokerImpl::new(message_broker_options)?;
66 Ok(Self {
67 broker: Arc::new(broker),
68 options,
69 })
70 }
71}
72
73#[async_trait]
74impl MessageCenter for MessageCenterImpl {
75 async fn subscribe(&self, topics: Vec<String>, qos: Qos) -> R<()> {
76 self.broker.subscribe(topics, qos).await
77 }
78
79 async fn unsubscribe(&self, topics: Vec<String>) -> R<()> {
80 self.broker.unsubscribe(topics).await
81 }
82
83 async fn listen(&self, handler: Arc<dyn Fn(message_center::Message) -> R<()> + Send + Sync>) -> R<()> {
84 let wrapped_handler = Arc::new(move |message_b: message_broker::Message| -> R<()> {
85 if let Ok(mut message) = from_str::<message_center::Message>(&message_b.body) {
86 handler(message)?;
87 }
88 Ok(())
89 });
90
91 self.broker.listen(wrapped_handler).await
92 }
93
94 async fn shutdown(&self) -> R<()> {
95 self.broker.shutdown().await
96 }
97
98 async fn publish(&self, qos: Qos, retain: bool, message: message_center::Message) -> R<()> {
99 let message_b = message_broker::Message {
100 name: message.name.clone(),
101 qos,
102 retain,
103 body: to_string(&message)?,
104 };
105 self.broker.publish(message_b).await
106 }
107
108 async fn publish_delay(&self, qos: Qos, retain: bool, message: message_center::Message) -> R<()> {
109 let mut headers = HeaderMap::new();
110 headers.insert(
111 "Authorization",
112 format!("token {}", self.options.publish_token.clone()).parse::<HeaderValue>()?,
113 );
114
115 let message_json = to_string(&message)?;
116 info!("publish message: {}", message_json);
117
118 let url = format!("{}?qos={:?}&retain={}", self.options.publish_url.clone(), qos, retain);
119 let client = reqwest::Client::new();
120 let res = client.post(url).headers(headers).json(&message).send().await?;
121 if !res.status().is_success() {
122 return meta!("publish_failed");
123 }
124 Ok(())
125 }
126}