sylvia_iot_broker/routes/
mod.rs

1use std::{
2    collections::HashMap,
3    error::Error as StdError,
4    io::{Error as IoError, ErrorKind},
5    sync::{Arc, Mutex},
6};
7
8use axum::{response::IntoResponse, Router};
9use reqwest;
10use serde::{Deserialize, Serialize};
11use url::Url;
12
13use async_trait::async_trait;
14use log::{error, info, warn};
15
16use general_mq::{
17    queue::{EventHandler as QueueEventHandler, GmqQueue, Status},
18    Queue,
19};
20use sylvia_iot_corelib::{
21    constants::{CacheEngine, DbEngine},
22    http::{Json, Query},
23};
24
25use crate::{
26    libs::{
27        config::{self, Config},
28        mq::{self, application::ApplicationMgr, network::NetworkMgr, Connection},
29    },
30    models::{
31        self, Cache, CacheConnOptions, ConnOptions, DeviceOptions, DeviceRouteOptions, Model,
32        MongoDbOptions, NetworkRouteOptions, SqliteOptions,
33    },
34};
35
36pub mod middleware;
37mod v1;
38
39/// The resources used by this service.
40#[derive(Clone)]
41pub struct State {
42    /// The scope root path for the service.
43    ///
44    /// For example `/broker`, the APIs are
45    /// - `http://host:port/broker/api/v1/unit/xxx`
46    /// - `http://host:port/broker/api/v1/application/xxx`
47    pub scope_path: &'static str,
48    /// The scopes for accessing APIs.
49    pub api_scopes: HashMap<String, Vec<String>>,
50    /// The database model.
51    pub model: Arc<dyn Model>,
52    /// The database cache.
53    pub cache: Option<Arc<dyn Cache>>,
54    /// The sylvia-iot-auth base API path with host.
55    ///
56    /// For example, `http://localhost:1080/auth`.
57    pub auth_base: String,
58    pub amqp_prefetch: u16,
59    pub amqp_persistent: bool,
60    pub mqtt_shared_prefix: String,
61    /// The client for internal HTTP requests.
62    pub client: reqwest::Client,
63    /// Queue connections. Key is uri.
64    pub mq_conns: Arc<Mutex<HashMap<String, Connection>>>,
65    /// Application managers. Key is `[unit-code].[application-code]`.
66    pub application_mgrs: Arc<Mutex<HashMap<String, ApplicationMgr>>>,
67    /// Network managers. Key is `[unit-code].[network-code]`. Unit code `_` means public network.
68    pub network_mgrs: Arc<Mutex<HashMap<String, NetworkMgr>>>,
69    /// Control channel receivers. Key is function such as `application`, `network`, ....
70    pub ctrl_receivers: Arc<Mutex<HashMap<String, Queue>>>,
71    /// Control channel senders.
72    pub ctrl_senders: CtrlSenders,
73    /// Data channel sender.
74    pub data_sender: Option<Queue>,
75}
76
77/// Control channel senders.
78#[derive(Clone)]
79pub struct CtrlSenders {
80    pub unit: Arc<Mutex<Queue>>,
81    pub application: Arc<Mutex<Queue>>,
82    pub network: Arc<Mutex<Queue>>,
83    pub device: Arc<Mutex<Queue>>,
84    pub device_route: Arc<Mutex<Queue>>,
85    pub network_route: Arc<Mutex<Queue>>,
86}
87
88/// The sylvia-iot module specific error codes in addition to standard
89/// [`sylvia_iot_corelib::err::ErrResp`].
90pub struct ErrReq;
91
92struct DataSenderHandler;
93
94/// Query parameters for `GET /version`
95#[derive(Deserialize)]
96pub struct GetVersionQuery {
97    q: Option<String>,
98}
99
100#[derive(Serialize)]
101struct GetVersionRes<'a> {
102    data: GetVersionResData<'a>,
103}
104
105#[derive(Serialize)]
106struct GetVersionResData<'a> {
107    name: &'a str,
108    version: &'a str,
109}
110
111const SERV_NAME: &'static str = env!("CARGO_PKG_NAME");
112const SERV_VER: &'static str = env!("CARGO_PKG_VERSION");
113
114impl ErrReq {
115    pub const APPLICATION_EXIST: (u16, &'static str) = (400, "err_broker_application_exist");
116    pub const APPLICATION_NOT_EXIST: (u16, &'static str) =
117        (400, "err_broker_application_not_exist");
118    pub const DEVICE_NOT_EXIST: (u16, &'static str) = (400, "err_broker_device_not_exist");
119    pub const MEMBER_NOT_EXIST: (u16, &'static str) = (400, "err_broker_member_not_exist");
120    pub const NETWORK_ADDR_EXIST: (u16, &'static str) = (400, "err_broker_network_addr_exist");
121    pub const NETWORK_EXIST: (u16, &'static str) = (400, "err_broker_network_exist");
122    pub const NETWORK_NOT_EXIST: (u16, &'static str) = (400, "err_broker_network_not_exist");
123    pub const OWNER_NOT_EXIST: (u16, &'static str) = (400, "err_broker_owner_not_exist");
124    pub const ROUTE_EXIST: (u16, &'static str) = (400, "err_broker_route_exist");
125    pub const UNIT_EXIST: (u16, &'static str) = (400, "err_broker_unit_exist");
126    pub const UNIT_NOT_EXIST: (u16, &'static str) = (400, "err_broker_unit_not_exist");
127    pub const UNIT_NOT_MATCH: (u16, &'static str) = (400, "err_broker_unit_not_match");
128}
129
130#[async_trait]
131impl QueueEventHandler for DataSenderHandler {
132    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
133        const FN_NAME: &'static str = "DataSenderHandler::on_error";
134        let queue_name = queue.name();
135        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
136    }
137
138    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
139        const FN_NAME: &'static str = "DataSenderHandler::on_status";
140        let queue_name = queue.name();
141        match status {
142            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
143            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
144        }
145    }
146}
147
148/// To create resources for the service.
149pub async fn new_state(
150    scope_path: &'static str,
151    conf: &Config,
152) -> Result<State, Box<dyn StdError>> {
153    let conf = config::apply_default(conf);
154    let db_opts = match conf.db.as_ref().unwrap().engine.as_ref().unwrap().as_str() {
155        DbEngine::MONGODB => {
156            let conf = conf.db.as_ref().unwrap().mongodb.as_ref().unwrap();
157            ConnOptions::MongoDB(MongoDbOptions {
158                url: conf.url.as_ref().unwrap().to_string(),
159                db: conf.database.as_ref().unwrap().to_string(),
160                pool_size: conf.pool_size,
161            })
162        }
163        _ => {
164            let conf = conf.db.as_ref().unwrap().sqlite.as_ref().unwrap();
165            ConnOptions::Sqlite(SqliteOptions {
166                path: conf.path.as_ref().unwrap().to_string(),
167            })
168        }
169    };
170    let cache_opts = match conf.cache.as_ref().unwrap().engine.as_ref() {
171        None => None,
172        Some(engine) => match engine.as_str() {
173            CacheEngine::MEMORY => {
174                let conf = conf.cache.as_ref().unwrap().memory.as_ref().unwrap();
175                Some(CacheConnOptions::Memory {
176                    device: DeviceOptions {
177                        uldata_size: conf.device.unwrap(),
178                    },
179                    device_route: DeviceRouteOptions {
180                        uldata_size: conf.device_route.unwrap(),
181                        dldata_size: conf.device_route.unwrap(),
182                        dldata_pub_size: conf.device_route.unwrap(),
183                    },
184                    network_route: NetworkRouteOptions {
185                        uldata_size: conf.device_route.unwrap(),
186                    },
187                })
188            }
189            _ => None,
190        },
191    };
192    let mq_conf = conf.mq.as_ref().unwrap();
193    let model = models::new(&db_opts).await?;
194    let cache = match cache_opts {
195        None => None,
196        Some(opts) => Some(models::new_cache(&opts, &model).await?),
197    };
198    let auth_base = conf.auth.as_ref().unwrap().clone();
199    let mq_conns = Arc::new(Mutex::new(HashMap::new()));
200    let ch_conf = conf.mq_channels.as_ref().unwrap();
201    let ctrl_senders = new_ctrl_senders(&mq_conns, &ch_conf, cache.clone())?;
202    let data_sender = match ch_conf.data.as_ref() {
203        None => None,
204        Some(conf) => match conf.url.as_ref() {
205            None => None,
206            Some(_) => Some(new_data_sender(&mq_conns, conf)?),
207        },
208    };
209    let state = State {
210        scope_path: match scope_path.len() {
211            0 => "/",
212            _ => scope_path,
213        },
214        api_scopes: conf.api_scopes.as_ref().unwrap().clone(),
215        model,
216        cache,
217        auth_base,
218        amqp_prefetch: mq_conf.prefetch.unwrap(),
219        amqp_persistent: mq_conf.persistent.unwrap(),
220        mqtt_shared_prefix: mq_conf.shared_prefix.as_ref().unwrap().to_string(),
221        client: reqwest::Client::new(),
222        mq_conns,
223        application_mgrs: Arc::new(Mutex::new(HashMap::new())),
224        network_mgrs: Arc::new(Mutex::new(HashMap::new())),
225        ctrl_receivers: Arc::new(Mutex::new(HashMap::new())),
226        ctrl_senders,
227        data_sender,
228    };
229    let (r1, r2, r3, r4, r5, r6) = tokio::join!(
230        v1::unit::init(&state, &ch_conf.unit.as_ref().unwrap()),
231        v1::application::init(&state, &ch_conf.application.as_ref().unwrap()),
232        v1::network::init(&state, &ch_conf.network.as_ref().unwrap()),
233        v1::device::init(&state, &ch_conf.device.as_ref().unwrap()),
234        v1::device_route::init(&state, &ch_conf.device_route.as_ref().unwrap()),
235        v1::network_route::init(&state, &ch_conf.network_route.as_ref().unwrap())
236    );
237    r1?;
238    r2?;
239    r3?;
240    r4?;
241    r5?;
242    r6?;
243    Ok(state)
244}
245
246/// To register service URIs in the specified root path.
247pub fn new_service(state: &State) -> Router {
248    Router::new().nest(
249        &state.scope_path,
250        Router::new()
251            .merge(v1::unit::new_service("/api/v1/unit", state))
252            .merge(v1::application::new_service("/api/v1/application", state))
253            .merge(v1::network::new_service("/api/v1/network", state))
254            .merge(v1::device::new_service("/api/v1/device", state))
255            .merge(v1::device_route::new_service("/api/v1/device-route", state))
256            .merge(v1::network_route::new_service(
257                "/api/v1/network-route",
258                state,
259            ))
260            .merge(v1::dldata_buffer::new_service(
261                "/api/v1/dldata-buffer",
262                state,
263            )),
264    )
265}
266
267pub fn new_ctrl_senders(
268    mq_conns: &Arc<Mutex<HashMap<String, Connection>>>,
269    ch_conf: &config::MqChannels,
270    cache: Option<Arc<dyn Cache>>,
271) -> Result<CtrlSenders, Box<dyn StdError>> {
272    let unit_ctrl_cfg = ch_conf.unit.as_ref().unwrap();
273    let app_ctrl_cfg = ch_conf.application.as_ref().unwrap();
274    let net_ctrl_cfg = ch_conf.network.as_ref().unwrap();
275    let dev_ctrl_cfg = ch_conf.device.as_ref().unwrap();
276    let devr_ctrl_cfg = ch_conf.device_route.as_ref().unwrap();
277    let netr_ctrl_cfg = ch_conf.network_route.as_ref().unwrap();
278
279    Ok(CtrlSenders {
280        unit: v1::unit::new_ctrl_sender(mq_conns, unit_ctrl_cfg)?,
281        application: v1::application::new_ctrl_sender(mq_conns, app_ctrl_cfg)?,
282        network: v1::network::new_ctrl_sender(mq_conns, net_ctrl_cfg, cache.clone())?,
283        device: v1::device::new_ctrl_sender(mq_conns, dev_ctrl_cfg, cache.clone())?,
284        device_route: v1::device_route::new_ctrl_sender(mq_conns, devr_ctrl_cfg, cache.clone())?,
285        network_route: v1::network_route::new_ctrl_sender(mq_conns, netr_ctrl_cfg, cache.clone())?,
286    })
287}
288
289/// Create data channel sender queue.
290pub fn new_data_sender(
291    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
292    config: &config::BrokerData,
293) -> Result<Queue, Box<dyn StdError>> {
294    let url = match config.url.as_ref() {
295        None => {
296            return Err(Box::new(IoError::new(
297                ErrorKind::InvalidInput,
298                "empty control url",
299            )))
300        }
301        Some(url) => match Url::parse(url.as_str()) {
302            Err(e) => return Err(Box::new(e)),
303            Ok(url) => url,
304        },
305    };
306    let persistent = match config.persistent {
307        None => false,
308        Some(persistent) => persistent,
309    };
310
311    match mq::data::new(conn_pool, &url, persistent, Arc::new(DataSenderHandler {})) {
312        Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
313        Ok(q) => Ok(q),
314    }
315}
316
317pub async fn get_version(Query(query): Query<GetVersionQuery>) -> impl IntoResponse {
318    if let Some(q) = query.q.as_ref() {
319        match q.as_str() {
320            "name" => return SERV_NAME.into_response(),
321            "version" => return SERV_VER.into_response(),
322            _ => (),
323        }
324    }
325
326    Json(GetVersionRes {
327        data: GetVersionResData {
328            name: SERV_NAME,
329            version: SERV_VER,
330        },
331    })
332    .into_response()
333}