1use std::{
2 collections::HashMap,
3 error::Error as StdError,
4 io::{Error as IoError, ErrorKind},
5 sync::{Arc, Mutex},
6};
7
8use axum::{response::IntoResponse, Router};
9use reqwest;
10use serde::{Deserialize, Serialize};
11use url::Url;
12
13use async_trait::async_trait;
14use log::{error, info, warn};
15
16use general_mq::{
17 queue::{EventHandler as QueueEventHandler, GmqQueue, Status},
18 Queue,
19};
20use sylvia_iot_corelib::{
21 constants::{CacheEngine, DbEngine},
22 http::{Json, Query},
23};
24
25use crate::{
26 libs::{
27 config::{self, Config},
28 mq::{self, application::ApplicationMgr, network::NetworkMgr, Connection},
29 },
30 models::{
31 self, Cache, CacheConnOptions, ConnOptions, DeviceOptions, DeviceRouteOptions, Model,
32 MongoDbOptions, NetworkRouteOptions, SqliteOptions,
33 },
34};
35
36pub mod middleware;
37mod v1;
38
39#[derive(Clone)]
41pub struct State {
42 pub scope_path: &'static str,
48 pub api_scopes: HashMap<String, Vec<String>>,
50 pub model: Arc<dyn Model>,
52 pub cache: Option<Arc<dyn Cache>>,
54 pub auth_base: String,
58 pub amqp_prefetch: u16,
59 pub amqp_persistent: bool,
60 pub mqtt_shared_prefix: String,
61 pub client: reqwest::Client,
63 pub mq_conns: Arc<Mutex<HashMap<String, Connection>>>,
65 pub application_mgrs: Arc<Mutex<HashMap<String, ApplicationMgr>>>,
67 pub network_mgrs: Arc<Mutex<HashMap<String, NetworkMgr>>>,
69 pub ctrl_receivers: Arc<Mutex<HashMap<String, Queue>>>,
71 pub ctrl_senders: CtrlSenders,
73 pub data_sender: Option<Queue>,
75}
76
77#[derive(Clone)]
79pub struct CtrlSenders {
80 pub unit: Arc<Mutex<Queue>>,
81 pub application: Arc<Mutex<Queue>>,
82 pub network: Arc<Mutex<Queue>>,
83 pub device: Arc<Mutex<Queue>>,
84 pub device_route: Arc<Mutex<Queue>>,
85 pub network_route: Arc<Mutex<Queue>>,
86}
87
88pub struct ErrReq;
91
92struct DataSenderHandler;
93
94#[derive(Deserialize)]
96pub struct GetVersionQuery {
97 q: Option<String>,
98}
99
100#[derive(Serialize)]
101struct GetVersionRes<'a> {
102 data: GetVersionResData<'a>,
103}
104
105#[derive(Serialize)]
106struct GetVersionResData<'a> {
107 name: &'a str,
108 version: &'a str,
109}
110
111const SERV_NAME: &'static str = env!("CARGO_PKG_NAME");
112const SERV_VER: &'static str = env!("CARGO_PKG_VERSION");
113
114impl ErrReq {
115 pub const APPLICATION_EXIST: (u16, &'static str) = (400, "err_broker_application_exist");
116 pub const APPLICATION_NOT_EXIST: (u16, &'static str) =
117 (400, "err_broker_application_not_exist");
118 pub const DEVICE_NOT_EXIST: (u16, &'static str) = (400, "err_broker_device_not_exist");
119 pub const MEMBER_NOT_EXIST: (u16, &'static str) = (400, "err_broker_member_not_exist");
120 pub const NETWORK_ADDR_EXIST: (u16, &'static str) = (400, "err_broker_network_addr_exist");
121 pub const NETWORK_EXIST: (u16, &'static str) = (400, "err_broker_network_exist");
122 pub const NETWORK_NOT_EXIST: (u16, &'static str) = (400, "err_broker_network_not_exist");
123 pub const OWNER_NOT_EXIST: (u16, &'static str) = (400, "err_broker_owner_not_exist");
124 pub const ROUTE_EXIST: (u16, &'static str) = (400, "err_broker_route_exist");
125 pub const UNIT_EXIST: (u16, &'static str) = (400, "err_broker_unit_exist");
126 pub const UNIT_NOT_EXIST: (u16, &'static str) = (400, "err_broker_unit_not_exist");
127 pub const UNIT_NOT_MATCH: (u16, &'static str) = (400, "err_broker_unit_not_match");
128}
129
130#[async_trait]
131impl QueueEventHandler for DataSenderHandler {
132 async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
133 const FN_NAME: &'static str = "DataSenderHandler::on_error";
134 let queue_name = queue.name();
135 error!("[{}] {} error: {}", FN_NAME, queue_name, err);
136 }
137
138 async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
139 const FN_NAME: &'static str = "DataSenderHandler::on_status";
140 let queue_name = queue.name();
141 match status {
142 Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
143 _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
144 }
145 }
146}
147
148pub async fn new_state(
150 scope_path: &'static str,
151 conf: &Config,
152) -> Result<State, Box<dyn StdError>> {
153 let conf = config::apply_default(conf);
154 let db_opts = match conf.db.as_ref().unwrap().engine.as_ref().unwrap().as_str() {
155 DbEngine::MONGODB => {
156 let conf = conf.db.as_ref().unwrap().mongodb.as_ref().unwrap();
157 ConnOptions::MongoDB(MongoDbOptions {
158 url: conf.url.as_ref().unwrap().to_string(),
159 db: conf.database.as_ref().unwrap().to_string(),
160 pool_size: conf.pool_size,
161 })
162 }
163 _ => {
164 let conf = conf.db.as_ref().unwrap().sqlite.as_ref().unwrap();
165 ConnOptions::Sqlite(SqliteOptions {
166 path: conf.path.as_ref().unwrap().to_string(),
167 })
168 }
169 };
170 let cache_opts = match conf.cache.as_ref().unwrap().engine.as_ref() {
171 None => None,
172 Some(engine) => match engine.as_str() {
173 CacheEngine::MEMORY => {
174 let conf = conf.cache.as_ref().unwrap().memory.as_ref().unwrap();
175 Some(CacheConnOptions::Memory {
176 device: DeviceOptions {
177 uldata_size: conf.device.unwrap(),
178 },
179 device_route: DeviceRouteOptions {
180 uldata_size: conf.device_route.unwrap(),
181 dldata_size: conf.device_route.unwrap(),
182 dldata_pub_size: conf.device_route.unwrap(),
183 },
184 network_route: NetworkRouteOptions {
185 uldata_size: conf.device_route.unwrap(),
186 },
187 })
188 }
189 _ => None,
190 },
191 };
192 let mq_conf = conf.mq.as_ref().unwrap();
193 let model = models::new(&db_opts).await?;
194 let cache = match cache_opts {
195 None => None,
196 Some(opts) => Some(models::new_cache(&opts, &model).await?),
197 };
198 let auth_base = conf.auth.as_ref().unwrap().clone();
199 let mq_conns = Arc::new(Mutex::new(HashMap::new()));
200 let ch_conf = conf.mq_channels.as_ref().unwrap();
201 let ctrl_senders = new_ctrl_senders(&mq_conns, &ch_conf, cache.clone())?;
202 let data_sender = match ch_conf.data.as_ref() {
203 None => None,
204 Some(conf) => match conf.url.as_ref() {
205 None => None,
206 Some(_) => Some(new_data_sender(&mq_conns, conf)?),
207 },
208 };
209 let state = State {
210 scope_path: match scope_path.len() {
211 0 => "/",
212 _ => scope_path,
213 },
214 api_scopes: conf.api_scopes.as_ref().unwrap().clone(),
215 model,
216 cache,
217 auth_base,
218 amqp_prefetch: mq_conf.prefetch.unwrap(),
219 amqp_persistent: mq_conf.persistent.unwrap(),
220 mqtt_shared_prefix: mq_conf.shared_prefix.as_ref().unwrap().to_string(),
221 client: reqwest::Client::new(),
222 mq_conns,
223 application_mgrs: Arc::new(Mutex::new(HashMap::new())),
224 network_mgrs: Arc::new(Mutex::new(HashMap::new())),
225 ctrl_receivers: Arc::new(Mutex::new(HashMap::new())),
226 ctrl_senders,
227 data_sender,
228 };
229 let (r1, r2, r3, r4, r5, r6) = tokio::join!(
230 v1::unit::init(&state, &ch_conf.unit.as_ref().unwrap()),
231 v1::application::init(&state, &ch_conf.application.as_ref().unwrap()),
232 v1::network::init(&state, &ch_conf.network.as_ref().unwrap()),
233 v1::device::init(&state, &ch_conf.device.as_ref().unwrap()),
234 v1::device_route::init(&state, &ch_conf.device_route.as_ref().unwrap()),
235 v1::network_route::init(&state, &ch_conf.network_route.as_ref().unwrap())
236 );
237 r1?;
238 r2?;
239 r3?;
240 r4?;
241 r5?;
242 r6?;
243 Ok(state)
244}
245
246pub fn new_service(state: &State) -> Router {
248 Router::new().nest(
249 &state.scope_path,
250 Router::new()
251 .merge(v1::unit::new_service("/api/v1/unit", state))
252 .merge(v1::application::new_service("/api/v1/application", state))
253 .merge(v1::network::new_service("/api/v1/network", state))
254 .merge(v1::device::new_service("/api/v1/device", state))
255 .merge(v1::device_route::new_service("/api/v1/device-route", state))
256 .merge(v1::network_route::new_service(
257 "/api/v1/network-route",
258 state,
259 ))
260 .merge(v1::dldata_buffer::new_service(
261 "/api/v1/dldata-buffer",
262 state,
263 )),
264 )
265}
266
267pub fn new_ctrl_senders(
268 mq_conns: &Arc<Mutex<HashMap<String, Connection>>>,
269 ch_conf: &config::MqChannels,
270 cache: Option<Arc<dyn Cache>>,
271) -> Result<CtrlSenders, Box<dyn StdError>> {
272 let unit_ctrl_cfg = ch_conf.unit.as_ref().unwrap();
273 let app_ctrl_cfg = ch_conf.application.as_ref().unwrap();
274 let net_ctrl_cfg = ch_conf.network.as_ref().unwrap();
275 let dev_ctrl_cfg = ch_conf.device.as_ref().unwrap();
276 let devr_ctrl_cfg = ch_conf.device_route.as_ref().unwrap();
277 let netr_ctrl_cfg = ch_conf.network_route.as_ref().unwrap();
278
279 Ok(CtrlSenders {
280 unit: v1::unit::new_ctrl_sender(mq_conns, unit_ctrl_cfg)?,
281 application: v1::application::new_ctrl_sender(mq_conns, app_ctrl_cfg)?,
282 network: v1::network::new_ctrl_sender(mq_conns, net_ctrl_cfg, cache.clone())?,
283 device: v1::device::new_ctrl_sender(mq_conns, dev_ctrl_cfg, cache.clone())?,
284 device_route: v1::device_route::new_ctrl_sender(mq_conns, devr_ctrl_cfg, cache.clone())?,
285 network_route: v1::network_route::new_ctrl_sender(mq_conns, netr_ctrl_cfg, cache.clone())?,
286 })
287}
288
289pub fn new_data_sender(
291 conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
292 config: &config::BrokerData,
293) -> Result<Queue, Box<dyn StdError>> {
294 let url = match config.url.as_ref() {
295 None => {
296 return Err(Box::new(IoError::new(
297 ErrorKind::InvalidInput,
298 "empty control url",
299 )))
300 }
301 Some(url) => match Url::parse(url.as_str()) {
302 Err(e) => return Err(Box::new(e)),
303 Ok(url) => url,
304 },
305 };
306 let persistent = match config.persistent {
307 None => false,
308 Some(persistent) => persistent,
309 };
310
311 match mq::data::new(conn_pool, &url, persistent, Arc::new(DataSenderHandler {})) {
312 Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
313 Ok(q) => Ok(q),
314 }
315}
316
317pub async fn get_version(Query(query): Query<GetVersionQuery>) -> impl IntoResponse {
318 if let Some(q) = query.q.as_ref() {
319 match q.as_str() {
320 "name" => return SERV_NAME.into_response(),
321 "version" => return SERV_VER.into_response(),
322 _ => (),
323 }
324 }
325
326 Json(GetVersionRes {
327 data: GetVersionResData {
328 name: SERV_NAME,
329 version: SERV_VER,
330 },
331 })
332 .into_response()
333}