sylvia_iot_broker/models/memory/
device_route.rs

1use std::{error::Error as StdError, num::NonZeroUsize, sync::Arc};
2
3use async_trait::async_trait;
4use lru::LruCache;
5use tokio::sync::RwLock;
6
7use super::super::{
8    device::{QueryCond as DeviceQueryCond, QueryOneCond as DeviceQueryOneCond},
9    device_route::{
10        DelCachePubQueryCond, DelCacheQueryCond, DeviceRouteCache, DeviceRouteCacheDlData,
11        DeviceRouteCacheUlData, GetCachePubQueryCond, GetCacheQueryCond, ListOptions,
12        ListQueryCond,
13    },
14    Model,
15};
16
17pub struct Cache {
18    model: Arc<dyn Model>,
19    uldata: Arc<RwLock<LruCache<String, Option<DeviceRouteCacheUlData>>>>,
20    dldata: Arc<RwLock<LruCache<String, Option<DeviceRouteCacheDlData>>>>,
21    dldata_pub: Arc<RwLock<LruCache<String, Option<DeviceRouteCacheDlData>>>>,
22}
23
24pub struct Options {
25    pub uldata_size: usize,
26    pub dldata_size: usize,
27    pub dldata_pub_size: usize,
28}
29
30const DEF_SIZE: usize = 10_000;
31
32impl Cache {
33    pub fn new(opts: &Options, model: Arc<dyn Model>) -> Self {
34        let (uldata, dldata, dldata_pub) = unsafe {
35            (
36                NonZeroUsize::new_unchecked(opts.uldata_size),
37                NonZeroUsize::new_unchecked(opts.dldata_size),
38                NonZeroUsize::new_unchecked(opts.dldata_pub_size),
39            )
40        };
41        Cache {
42            model,
43            uldata: Arc::new(RwLock::new(LruCache::new(uldata))),
44            dldata: Arc::new(RwLock::new(LruCache::new(dldata))),
45            dldata_pub: Arc::new(RwLock::new(LruCache::new(dldata_pub))),
46        }
47    }
48}
49
50#[async_trait]
51impl DeviceRouteCache for Cache {
52    async fn clear(&self) -> Result<(), Box<dyn StdError>> {
53        // To collect all locks before clearing cache.
54        let mut lock1 = self.uldata.write().await;
55        let mut lock2 = self.dldata.write().await;
56        let mut lock3 = self.dldata_pub.write().await;
57        lock1.clear();
58        lock2.clear();
59        lock3.clear();
60        Ok(())
61    }
62
63    async fn get_uldata(
64        &self,
65        device_id: &str,
66    ) -> Result<Option<DeviceRouteCacheUlData>, Box<dyn StdError>> {
67        {
68            let mut lock = self.uldata.write().await;
69            if let Some(value) = lock.get(device_id) {
70                return Ok(value.clone());
71            }
72        }
73
74        let opts = ListOptions {
75            cond: &ListQueryCond {
76                device_id: Some(device_id),
77                ..Default::default()
78            },
79            offset: None,
80            limit: None,
81            sort: None,
82            cursor_max: None,
83        };
84        let (mut routes, _) = self.model.device_route().list(&opts, None).await?;
85        let data: Option<DeviceRouteCacheUlData> = match routes.len() {
86            0 => None,
87            _ => {
88                let mut routes_data = vec![];
89                for r in routes.iter() {
90                    routes_data.push(format!("{}.{}", r.unit_code, r.application_code))
91                }
92                let _ = routes.pop().unwrap();
93                Some(DeviceRouteCacheUlData {
94                    app_mgr_keys: routes_data,
95                })
96            }
97        };
98        let _ = self.set_uldata(device_id, data.as_ref()).await;
99        Ok(data)
100    }
101
102    async fn set_uldata(
103        &self,
104        device_id: &str,
105        value: Option<&DeviceRouteCacheUlData>,
106    ) -> Result<(), Box<dyn StdError>> {
107        let key = device_id.to_string();
108        let mut lock = self.uldata.write().await;
109        let _ = match value {
110            None => lock.push(key, None),
111            Some(value) => lock.push(key, Some(value.clone())),
112        };
113        Ok(())
114    }
115
116    async fn del_uldata(&self, device_id: &str) -> Result<(), Box<dyn StdError>> {
117        let mut lock = self.uldata.write().await;
118        lock.pop(device_id);
119        Ok(())
120    }
121
122    async fn get_dldata(
123        &self,
124        cond: &GetCacheQueryCond,
125    ) -> Result<Option<DeviceRouteCacheDlData>, Box<dyn StdError>> {
126        let key = format!(
127            "{}.{}.{}",
128            cond.unit_code, cond.network_code, cond.network_addr
129        );
130
131        {
132            let mut lock = self.dldata.write().await;
133            if let Some(value) = lock.get(&key) {
134                match value {
135                    None => return Ok(None),
136                    Some(value) => return Ok(Some(value.clone())),
137                }
138            }
139        }
140
141        let dev_cond = DeviceQueryCond {
142            device: Some(DeviceQueryOneCond {
143                unit_code: Some(cond.unit_code),
144                network_code: cond.network_code,
145                network_addr: cond.network_addr,
146            }),
147            ..Default::default()
148        };
149        let device = self.model.device().get(&dev_cond).await?;
150        let data = match device {
151            None => None,
152            Some(device) => match device.unit_code.as_ref() {
153                // This should not occur!
154                None => None,
155                Some(unit_code) => Some(DeviceRouteCacheDlData {
156                    net_mgr_key: format!("{}.{}", unit_code, cond.network_code),
157                    network_id: device.network_id,
158                    network_addr: device.network_addr,
159                    device_id: device.device_id,
160                    profile: device.profile,
161                }),
162            },
163        };
164        let _ = self.set_dldata(cond, data.as_ref()).await;
165        Ok(data)
166    }
167
168    async fn set_dldata(
169        &self,
170        cond: &GetCacheQueryCond,
171        value: Option<&DeviceRouteCacheDlData>,
172    ) -> Result<(), Box<dyn StdError>> {
173        let key = format!(
174            "{}.{}.{}",
175            cond.unit_code, cond.network_code, cond.network_addr
176        );
177        let mut lock = self.dldata.write().await;
178        let _ = match value {
179            None => lock.push(key, None),
180            Some(value) => lock.push(key, Some(value.clone())),
181        };
182        Ok(())
183    }
184
185    async fn del_dldata(&self, cond: &DelCacheQueryCond) -> Result<(), Box<dyn StdError>> {
186        let key = match cond.network_code {
187            None => {
188                // Remove all routes of the unit.
189                cond.unit_code.to_string()
190            }
191            Some(code) => match cond.network_addr {
192                None => {
193                    // Remove all routes of the network.
194                    format!("{}.{}", cond.unit_code, code)
195                }
196                Some(addr) => {
197                    let key = format!("{}.{}.{}", cond.unit_code, code, addr);
198                    let mut lock = self.dldata.write().await;
199                    let _ = lock.pop(&key);
200                    return Ok(());
201                }
202            },
203        };
204        {
205            let mut lock = self.dldata.write().await;
206            loop {
207                let mut rm_key = None;
208                for (k, _) in lock.iter() {
209                    if k.starts_with(key.as_str()) {
210                        rm_key = Some(k.clone());
211                        break;
212                    }
213                }
214                match rm_key {
215                    None => break,
216                    Some(key) => {
217                        let _ = lock.pop(&key);
218                    }
219                }
220            }
221        }
222        Ok(())
223    }
224
225    async fn get_dldata_pub(
226        &self,
227        cond: &GetCachePubQueryCond,
228    ) -> Result<Option<DeviceRouteCacheDlData>, Box<dyn StdError>> {
229        let key = format!("{}.{}", cond.unit_id, cond.device_id);
230
231        {
232            let mut lock = self.dldata_pub.write().await;
233            if let Some(value) = lock.get(&key) {
234                match value {
235                    None => return Ok(None),
236                    Some(value) => return Ok(Some(value.clone())),
237                }
238            }
239        }
240
241        let dev_cond = DeviceQueryCond {
242            unit_id: Some(cond.unit_id),
243            device_id: Some(cond.device_id),
244            ..Default::default()
245        };
246        let device = self.model.device().get(&dev_cond).await?;
247        let data = match device {
248            None => None,
249            Some(device) => match device.unit_code.as_ref() {
250                None => Some(DeviceRouteCacheDlData {
251                    net_mgr_key: format!(".{}", device.network_code),
252                    network_id: device.network_id,
253                    network_addr: device.network_addr,
254                    device_id: device.device_id,
255                    profile: device.profile,
256                }),
257                Some(unit_code) => Some(DeviceRouteCacheDlData {
258                    net_mgr_key: format!("{}.{}", unit_code, device.network_code),
259                    network_id: device.network_id,
260                    network_addr: device.network_addr,
261                    device_id: device.device_id,
262                    profile: device.profile,
263                }),
264            },
265        };
266        let _ = self.set_dldata_pub(cond, data.as_ref()).await;
267        Ok(data)
268    }
269
270    async fn set_dldata_pub(
271        &self,
272        cond: &GetCachePubQueryCond,
273        value: Option<&DeviceRouteCacheDlData>,
274    ) -> Result<(), Box<dyn StdError>> {
275        let key = format!("{}.{}", cond.unit_id, cond.device_id);
276        let mut lock = self.dldata_pub.write().await;
277        let _ = match value {
278            None => lock.push(key, None),
279            Some(value) => lock.push(key, Some(value.clone())),
280        };
281        Ok(())
282    }
283
284    async fn del_dldata_pub(&self, cond: &DelCachePubQueryCond) -> Result<(), Box<dyn StdError>> {
285        let key = match cond.device_id {
286            None => {
287                // Remove all routes of the unit.
288                cond.unit_id.to_string()
289            }
290            Some(id) => {
291                let key = format!("{}.{}", cond.unit_id, id);
292                {
293                    let mut lock = self.dldata_pub.write().await;
294                    lock.pop(&key);
295                }
296                return Ok(());
297            }
298        };
299        {
300            let mut lock = self.dldata_pub.write().await;
301            loop {
302                let mut rm_key = None;
303                for (k, _) in lock.iter() {
304                    if k.starts_with(key.as_str()) {
305                        rm_key = Some(k.clone());
306                        break;
307                    }
308                }
309                match rm_key {
310                    None => break,
311                    Some(key) => {
312                        let _ = lock.pop(&key);
313                    }
314                }
315            }
316        }
317        Ok(())
318    }
319}
320
321impl Default for Options {
322    fn default() -> Self {
323        Options {
324            uldata_size: DEF_SIZE,
325            dldata_size: DEF_SIZE,
326            dldata_pub_size: DEF_SIZE,
327        }
328    }
329}