sylvia_iot_broker/models/memory/
device.rs

1use std::{error::Error as StdError, num::NonZeroUsize, sync::Arc};
2
3use async_trait::async_trait;
4use lru::LruCache;
5use tokio::sync::RwLock;
6
7use super::super::{
8    device::{
9        DelCacheQueryCond, DeviceCache, DeviceCacheItem, GetCacheQueryCond, QueryCond, QueryOneCond,
10    },
11    Model,
12};
13
14pub struct Cache {
15    model: Arc<dyn Model>,
16    uldata: Arc<RwLock<LruCache<String, Option<DeviceCacheItem>>>>,
17}
18
19pub struct Options {
20    pub uldata_size: usize,
21}
22
23const DEF_SIZE: usize = 10_000;
24
25impl Cache {
26    pub fn new(opts: &Options, model: Arc<dyn Model>) -> Self {
27        let uldata = unsafe { NonZeroUsize::new_unchecked(opts.uldata_size) };
28        Cache {
29            model,
30            uldata: Arc::new(RwLock::new(LruCache::new(uldata))),
31        }
32    }
33}
34
35#[async_trait]
36impl DeviceCache for Cache {
37    async fn clear(&self) -> Result<(), Box<dyn StdError>> {
38        // To collect all locks before clearing cache.
39        let mut lock = self.uldata.write().await;
40        lock.clear();
41        Ok(())
42    }
43
44    async fn get(
45        &self,
46        cond: &GetCacheQueryCond,
47    ) -> Result<Option<DeviceCacheItem>, Box<dyn StdError>> {
48        // Try to hit cache first, or returns a model query condition.
49        let model_cond = match cond {
50            GetCacheQueryCond::CodeAddr(cond) => {
51                let key = match cond.unit_code {
52                    None => format!(".{}.{}", cond.network_code, cond.network_addr),
53                    Some(unit) => format!("{}.{}.{}", unit, cond.network_code, cond.network_addr),
54                };
55                {
56                    let mut lock = self.uldata.write().await;
57                    if let Some(value) = lock.get(&key) {
58                        return Ok(value.clone());
59                    }
60                }
61                QueryCond {
62                    device: Some(QueryOneCond {
63                        unit_code: cond.unit_code,
64                        network_code: cond.network_code,
65                        network_addr: cond.network_addr,
66                    }),
67                    ..Default::default()
68                }
69            }
70        };
71
72        let item = match self.model.device().get(&model_cond).await? {
73            None => None,
74            Some(device) => Some(DeviceCacheItem {
75                device_id: device.device_id,
76                profile: device.profile,
77            }),
78        };
79        let _ = self.set(cond, item.as_ref()).await;
80        Ok(item)
81    }
82
83    async fn set(
84        &self,
85        cond: &GetCacheQueryCond,
86        value: Option<&DeviceCacheItem>,
87    ) -> Result<(), Box<dyn StdError>> {
88        match cond {
89            GetCacheQueryCond::CodeAddr(cond) => {
90                let key = match cond.unit_code {
91                    None => format!(".{}.{}", cond.network_code, cond.network_addr),
92                    Some(unit) => format!("{}.{}.{}", unit, cond.network_code, cond.network_addr),
93                };
94                {
95                    let mut lock = self.uldata.write().await;
96                    let _ = match value {
97                        None => lock.push(key, None),
98                        Some(value) => lock.push(key, Some(value.clone())),
99                    };
100                }
101            }
102        }
103        Ok(())
104    }
105
106    async fn del(&self, cond: &DelCacheQueryCond) -> Result<(), Box<dyn StdError>> {
107        let key = match cond.network_code {
108            None => {
109                // Disallow deleting all devices of public networks.
110                if cond.unit_code.len() == 0 {
111                    return Ok(());
112                }
113
114                // Remove all routes of the unit.
115                cond.unit_code.to_string()
116            }
117            Some(code) => match cond.network_addr {
118                None => {
119                    // Remove all routes of the network.
120                    format!("{}.{}", cond.unit_code, code)
121                }
122                Some(addr) => {
123                    let key = format!("{}.{}.{}", cond.unit_code, code, addr);
124                    {
125                        let mut lock = self.uldata.write().await;
126                        lock.pop(&key);
127                    }
128                    return Ok(());
129                }
130            },
131        };
132        {
133            let mut lock = self.uldata.write().await;
134            loop {
135                let mut rm_key = None;
136                for (k, _) in lock.iter() {
137                    if k.starts_with(key.as_str()) {
138                        rm_key = Some(k.clone());
139                        break;
140                    }
141                }
142                match rm_key {
143                    None => break,
144                    Some(key) => {
145                        let _ = lock.pop(&key);
146                    }
147                }
148            }
149        }
150        Ok(())
151    }
152}
153
154impl Default for Options {
155    fn default() -> Self {
156        Options {
157            uldata_size: DEF_SIZE,
158        }
159    }
160}