sylvia_iot_broker/models/memory/
device.rs1use std::{error::Error as StdError, num::NonZeroUsize, sync::Arc};
2
3use async_trait::async_trait;
4use lru::LruCache;
5use tokio::sync::RwLock;
6
7use super::super::{
8 device::{
9 DelCacheQueryCond, DeviceCache, DeviceCacheItem, GetCacheQueryCond, QueryCond, QueryOneCond,
10 },
11 Model,
12};
13
14pub struct Cache {
15 model: Arc<dyn Model>,
16 uldata: Arc<RwLock<LruCache<String, Option<DeviceCacheItem>>>>,
17}
18
19pub struct Options {
20 pub uldata_size: usize,
21}
22
23const DEF_SIZE: usize = 10_000;
24
25impl Cache {
26 pub fn new(opts: &Options, model: Arc<dyn Model>) -> Self {
27 let uldata = unsafe { NonZeroUsize::new_unchecked(opts.uldata_size) };
28 Cache {
29 model,
30 uldata: Arc::new(RwLock::new(LruCache::new(uldata))),
31 }
32 }
33}
34
35#[async_trait]
36impl DeviceCache for Cache {
37 async fn clear(&self) -> Result<(), Box<dyn StdError>> {
38 let mut lock = self.uldata.write().await;
40 lock.clear();
41 Ok(())
42 }
43
44 async fn get(
45 &self,
46 cond: &GetCacheQueryCond,
47 ) -> Result<Option<DeviceCacheItem>, Box<dyn StdError>> {
48 let model_cond = match cond {
50 GetCacheQueryCond::CodeAddr(cond) => {
51 let key = match cond.unit_code {
52 None => format!(".{}.{}", cond.network_code, cond.network_addr),
53 Some(unit) => format!("{}.{}.{}", unit, cond.network_code, cond.network_addr),
54 };
55 {
56 let mut lock = self.uldata.write().await;
57 if let Some(value) = lock.get(&key) {
58 return Ok(value.clone());
59 }
60 }
61 QueryCond {
62 device: Some(QueryOneCond {
63 unit_code: cond.unit_code,
64 network_code: cond.network_code,
65 network_addr: cond.network_addr,
66 }),
67 ..Default::default()
68 }
69 }
70 };
71
72 let item = match self.model.device().get(&model_cond).await? {
73 None => None,
74 Some(device) => Some(DeviceCacheItem {
75 device_id: device.device_id,
76 profile: device.profile,
77 }),
78 };
79 let _ = self.set(cond, item.as_ref()).await;
80 Ok(item)
81 }
82
83 async fn set(
84 &self,
85 cond: &GetCacheQueryCond,
86 value: Option<&DeviceCacheItem>,
87 ) -> Result<(), Box<dyn StdError>> {
88 match cond {
89 GetCacheQueryCond::CodeAddr(cond) => {
90 let key = match cond.unit_code {
91 None => format!(".{}.{}", cond.network_code, cond.network_addr),
92 Some(unit) => format!("{}.{}.{}", unit, cond.network_code, cond.network_addr),
93 };
94 {
95 let mut lock = self.uldata.write().await;
96 let _ = match value {
97 None => lock.push(key, None),
98 Some(value) => lock.push(key, Some(value.clone())),
99 };
100 }
101 }
102 }
103 Ok(())
104 }
105
106 async fn del(&self, cond: &DelCacheQueryCond) -> Result<(), Box<dyn StdError>> {
107 let key = match cond.network_code {
108 None => {
109 if cond.unit_code.len() == 0 {
111 return Ok(());
112 }
113
114 cond.unit_code.to_string()
116 }
117 Some(code) => match cond.network_addr {
118 None => {
119 format!("{}.{}", cond.unit_code, code)
121 }
122 Some(addr) => {
123 let key = format!("{}.{}.{}", cond.unit_code, code, addr);
124 {
125 let mut lock = self.uldata.write().await;
126 lock.pop(&key);
127 }
128 return Ok(());
129 }
130 },
131 };
132 {
133 let mut lock = self.uldata.write().await;
134 loop {
135 let mut rm_key = None;
136 for (k, _) in lock.iter() {
137 if k.starts_with(key.as_str()) {
138 rm_key = Some(k.clone());
139 break;
140 }
141 }
142 match rm_key {
143 None => break,
144 Some(key) => {
145 let _ = lock.pop(&key);
146 }
147 }
148 }
149 }
150 Ok(())
151 }
152}
153
154impl Default for Options {
155 fn default() -> Self {
156 Options {
157 uldata_size: DEF_SIZE,
158 }
159 }
160}