sylvia_iot_broker/models/memory/
device_route.rs1use std::{error::Error as StdError, num::NonZeroUsize, sync::Arc};
2
3use async_trait::async_trait;
4use lru::LruCache;
5use tokio::sync::RwLock;
6
7use super::super::{
8 device::{QueryCond as DeviceQueryCond, QueryOneCond as DeviceQueryOneCond},
9 device_route::{
10 DelCachePubQueryCond, DelCacheQueryCond, DeviceRouteCache, DeviceRouteCacheDlData,
11 DeviceRouteCacheUlData, GetCachePubQueryCond, GetCacheQueryCond, ListOptions,
12 ListQueryCond,
13 },
14 Model,
15};
16
17pub struct Cache {
18 model: Arc<dyn Model>,
19 uldata: Arc<RwLock<LruCache<String, Option<DeviceRouteCacheUlData>>>>,
20 dldata: Arc<RwLock<LruCache<String, Option<DeviceRouteCacheDlData>>>>,
21 dldata_pub: Arc<RwLock<LruCache<String, Option<DeviceRouteCacheDlData>>>>,
22}
23
24pub struct Options {
25 pub uldata_size: usize,
26 pub dldata_size: usize,
27 pub dldata_pub_size: usize,
28}
29
30const DEF_SIZE: usize = 10_000;
31
32impl Cache {
33 pub fn new(opts: &Options, model: Arc<dyn Model>) -> Self {
34 let (uldata, dldata, dldata_pub) = unsafe {
35 (
36 NonZeroUsize::new_unchecked(opts.uldata_size),
37 NonZeroUsize::new_unchecked(opts.dldata_size),
38 NonZeroUsize::new_unchecked(opts.dldata_pub_size),
39 )
40 };
41 Cache {
42 model,
43 uldata: Arc::new(RwLock::new(LruCache::new(uldata))),
44 dldata: Arc::new(RwLock::new(LruCache::new(dldata))),
45 dldata_pub: Arc::new(RwLock::new(LruCache::new(dldata_pub))),
46 }
47 }
48}
49
50#[async_trait]
51impl DeviceRouteCache for Cache {
52 async fn clear(&self) -> Result<(), Box<dyn StdError>> {
53 let mut lock1 = self.uldata.write().await;
55 let mut lock2 = self.dldata.write().await;
56 let mut lock3 = self.dldata_pub.write().await;
57 lock1.clear();
58 lock2.clear();
59 lock3.clear();
60 Ok(())
61 }
62
63 async fn get_uldata(
64 &self,
65 device_id: &str,
66 ) -> Result<Option<DeviceRouteCacheUlData>, Box<dyn StdError>> {
67 {
68 let mut lock = self.uldata.write().await;
69 if let Some(value) = lock.get(device_id) {
70 return Ok(value.clone());
71 }
72 }
73
74 let opts = ListOptions {
75 cond: &ListQueryCond {
76 device_id: Some(device_id),
77 ..Default::default()
78 },
79 offset: None,
80 limit: None,
81 sort: None,
82 cursor_max: None,
83 };
84 let (mut routes, _) = self.model.device_route().list(&opts, None).await?;
85 let data: Option<DeviceRouteCacheUlData> = match routes.len() {
86 0 => None,
87 _ => {
88 let mut routes_data = vec![];
89 for r in routes.iter() {
90 routes_data.push(format!("{}.{}", r.unit_code, r.application_code))
91 }
92 let _ = routes.pop().unwrap();
93 Some(DeviceRouteCacheUlData {
94 app_mgr_keys: routes_data,
95 })
96 }
97 };
98 let _ = self.set_uldata(device_id, data.as_ref()).await;
99 Ok(data)
100 }
101
102 async fn set_uldata(
103 &self,
104 device_id: &str,
105 value: Option<&DeviceRouteCacheUlData>,
106 ) -> Result<(), Box<dyn StdError>> {
107 let key = device_id.to_string();
108 let mut lock = self.uldata.write().await;
109 let _ = match value {
110 None => lock.push(key, None),
111 Some(value) => lock.push(key, Some(value.clone())),
112 };
113 Ok(())
114 }
115
116 async fn del_uldata(&self, device_id: &str) -> Result<(), Box<dyn StdError>> {
117 let mut lock = self.uldata.write().await;
118 lock.pop(device_id);
119 Ok(())
120 }
121
122 async fn get_dldata(
123 &self,
124 cond: &GetCacheQueryCond,
125 ) -> Result<Option<DeviceRouteCacheDlData>, Box<dyn StdError>> {
126 let key = format!(
127 "{}.{}.{}",
128 cond.unit_code, cond.network_code, cond.network_addr
129 );
130
131 {
132 let mut lock = self.dldata.write().await;
133 if let Some(value) = lock.get(&key) {
134 match value {
135 None => return Ok(None),
136 Some(value) => return Ok(Some(value.clone())),
137 }
138 }
139 }
140
141 let dev_cond = DeviceQueryCond {
142 device: Some(DeviceQueryOneCond {
143 unit_code: Some(cond.unit_code),
144 network_code: cond.network_code,
145 network_addr: cond.network_addr,
146 }),
147 ..Default::default()
148 };
149 let device = self.model.device().get(&dev_cond).await?;
150 let data = match device {
151 None => None,
152 Some(device) => match device.unit_code.as_ref() {
153 None => None,
155 Some(unit_code) => Some(DeviceRouteCacheDlData {
156 net_mgr_key: format!("{}.{}", unit_code, cond.network_code),
157 network_id: device.network_id,
158 network_addr: device.network_addr,
159 device_id: device.device_id,
160 profile: device.profile,
161 }),
162 },
163 };
164 let _ = self.set_dldata(cond, data.as_ref()).await;
165 Ok(data)
166 }
167
168 async fn set_dldata(
169 &self,
170 cond: &GetCacheQueryCond,
171 value: Option<&DeviceRouteCacheDlData>,
172 ) -> Result<(), Box<dyn StdError>> {
173 let key = format!(
174 "{}.{}.{}",
175 cond.unit_code, cond.network_code, cond.network_addr
176 );
177 let mut lock = self.dldata.write().await;
178 let _ = match value {
179 None => lock.push(key, None),
180 Some(value) => lock.push(key, Some(value.clone())),
181 };
182 Ok(())
183 }
184
185 async fn del_dldata(&self, cond: &DelCacheQueryCond) -> Result<(), Box<dyn StdError>> {
186 let key = match cond.network_code {
187 None => {
188 cond.unit_code.to_string()
190 }
191 Some(code) => match cond.network_addr {
192 None => {
193 format!("{}.{}", cond.unit_code, code)
195 }
196 Some(addr) => {
197 let key = format!("{}.{}.{}", cond.unit_code, code, addr);
198 let mut lock = self.dldata.write().await;
199 let _ = lock.pop(&key);
200 return Ok(());
201 }
202 },
203 };
204 {
205 let mut lock = self.dldata.write().await;
206 loop {
207 let mut rm_key = None;
208 for (k, _) in lock.iter() {
209 if k.starts_with(key.as_str()) {
210 rm_key = Some(k.clone());
211 break;
212 }
213 }
214 match rm_key {
215 None => break,
216 Some(key) => {
217 let _ = lock.pop(&key);
218 }
219 }
220 }
221 }
222 Ok(())
223 }
224
225 async fn get_dldata_pub(
226 &self,
227 cond: &GetCachePubQueryCond,
228 ) -> Result<Option<DeviceRouteCacheDlData>, Box<dyn StdError>> {
229 let key = format!("{}.{}", cond.unit_id, cond.device_id);
230
231 {
232 let mut lock = self.dldata_pub.write().await;
233 if let Some(value) = lock.get(&key) {
234 match value {
235 None => return Ok(None),
236 Some(value) => return Ok(Some(value.clone())),
237 }
238 }
239 }
240
241 let dev_cond = DeviceQueryCond {
242 unit_id: Some(cond.unit_id),
243 device_id: Some(cond.device_id),
244 ..Default::default()
245 };
246 let device = self.model.device().get(&dev_cond).await?;
247 let data = match device {
248 None => None,
249 Some(device) => match device.unit_code.as_ref() {
250 None => Some(DeviceRouteCacheDlData {
251 net_mgr_key: format!(".{}", device.network_code),
252 network_id: device.network_id,
253 network_addr: device.network_addr,
254 device_id: device.device_id,
255 profile: device.profile,
256 }),
257 Some(unit_code) => Some(DeviceRouteCacheDlData {
258 net_mgr_key: format!("{}.{}", unit_code, device.network_code),
259 network_id: device.network_id,
260 network_addr: device.network_addr,
261 device_id: device.device_id,
262 profile: device.profile,
263 }),
264 },
265 };
266 let _ = self.set_dldata_pub(cond, data.as_ref()).await;
267 Ok(data)
268 }
269
270 async fn set_dldata_pub(
271 &self,
272 cond: &GetCachePubQueryCond,
273 value: Option<&DeviceRouteCacheDlData>,
274 ) -> Result<(), Box<dyn StdError>> {
275 let key = format!("{}.{}", cond.unit_id, cond.device_id);
276 let mut lock = self.dldata_pub.write().await;
277 let _ = match value {
278 None => lock.push(key, None),
279 Some(value) => lock.push(key, Some(value.clone())),
280 };
281 Ok(())
282 }
283
284 async fn del_dldata_pub(&self, cond: &DelCachePubQueryCond) -> Result<(), Box<dyn StdError>> {
285 let key = match cond.device_id {
286 None => {
287 cond.unit_id.to_string()
289 }
290 Some(id) => {
291 let key = format!("{}.{}", cond.unit_id, id);
292 {
293 let mut lock = self.dldata_pub.write().await;
294 lock.pop(&key);
295 }
296 return Ok(());
297 }
298 };
299 {
300 let mut lock = self.dldata_pub.write().await;
301 loop {
302 let mut rm_key = None;
303 for (k, _) in lock.iter() {
304 if k.starts_with(key.as_str()) {
305 rm_key = Some(k.clone());
306 break;
307 }
308 }
309 match rm_key {
310 None => break,
311 Some(key) => {
312 let _ = lock.pop(&key);
313 }
314 }
315 }
316 }
317 Ok(())
318 }
319}
320
321impl Default for Options {
322 fn default() -> Self {
323 Options {
324 uldata_size: DEF_SIZE,
325 dldata_size: DEF_SIZE,
326 dldata_pub_size: DEF_SIZE,
327 }
328 }
329}