lunatic_distributed/control/
client.rs1use anyhow::{Context, Result};
2use dashmap::DashMap;
3use lunatic_control::api::*;
4use lunatic_control::NodeInfo;
5use lunatic_process::runtimes::RawWasm;
6use reqwest::{Client as HttpClient, Url};
7use serde::{de::DeserializeOwned, Serialize};
8use std::{
9 collections::HashMap,
10 net::SocketAddr,
11 sync::{atomic, atomic::AtomicU64, Arc, RwLock},
12 time::Duration,
13};
14
15#[derive(Clone)]
16pub struct Client {
17 inner: Arc<InnerClient>,
18}
19
20pub struct InnerClient {
21 reg: Registration,
22 node_id: u64,
23 http_client: HttpClient,
24 next_message_id: AtomicU64,
25 next_query_id: AtomicU64,
26 node_queries: DashMap<u64, Vec<u64>>,
27 nodes: DashMap<u64, NodeInfo>,
28 node_ids: RwLock<Vec<u64>>,
29}
30
31impl Client {
32 pub async fn new(
33 http_client: HttpClient,
34 reg: Registration,
35 node_address: SocketAddr,
36 attributes: HashMap<String, String>,
37 ) -> Result<Self> {
38 let node_id = Self::start(
39 &http_client,
40 ®,
41 NodeStart {
42 node_address,
43 attributes,
44 },
45 )
46 .await?;
47
48 let client = Client {
49 inner: Arc::new(InnerClient {
50 reg,
51 node_id,
52 http_client,
53 next_message_id: AtomicU64::new(1),
54 node_queries: DashMap::new(),
55 next_query_id: AtomicU64::new(1),
56 nodes: Default::default(),
57 node_ids: Default::default(),
58 }),
59 };
60
61 tokio::task::spawn(refresh_nodes_task(client.clone()));
62 client.refresh_nodes().await?;
63
64 Ok(client)
65 }
66
67 pub async fn register(
68 http_client: &HttpClient,
69 control_url: Url,
70 node_name: uuid::Uuid,
71 csr_pem: String,
72 ) -> Result<Registration> {
73 let reg = Register { node_name, csr_pem };
74 Self::send_registration(http_client, control_url, reg).await
75 }
76
77 pub fn reg(&self) -> Registration {
78 self.inner.reg.clone()
79 }
80
81 pub fn node_id(&self) -> u64 {
82 self.inner.node_id
83 }
84
85 pub fn next_message_id(&self) -> u64 {
86 self.inner
87 .next_message_id
88 .fetch_add(1, atomic::Ordering::Relaxed)
89 }
90
91 pub fn next_query_id(&self) -> u64 {
92 self.inner
93 .next_query_id
94 .fetch_add(1, atomic::Ordering::Relaxed)
95 }
96
97 async fn send_registration(
98 client: &HttpClient,
99 url: Url,
100 reg: Register,
101 ) -> Result<Registration> {
102 let resp: Registration = client
103 .post(url)
104 .json(®)
105 .send()
106 .await
107 .with_context(|| "Error sending HTTP registration request.")?
108 .error_for_status()
109 .with_context(|| "HTTP registration request returned an error response.")?
110 .json()
111 .await
112 .with_context(|| "Error parsing the registration request JSON.")?;
113 Ok(resp)
114 }
115
116 async fn start(client: &HttpClient, reg: &Registration, start: NodeStart) -> Result<u64> {
117 let resp: NodeStarted = client
118 .post(®.urls.node_started)
119 .json(&start)
120 .bearer_auth(®.authentication_token)
121 .header(
122 "x-lunatic-node-name",
123 ®.node_name.hyphenated().to_string(),
124 )
125 .send()
126 .await?
127 .json()
128 .await?;
129 Ok(resp.node_id as u64)
130 }
131
132 pub async fn get<T: DeserializeOwned>(&self, url: &str, query: Option<&str>) -> Result<T> {
133 let mut url: Url = url.parse()?;
134 url.set_query(query);
135
136 let resp: T = self
137 .inner
138 .http_client
139 .get(url.clone())
140 .bearer_auth(&self.inner.reg.authentication_token)
141 .header(
142 "x-lunatic-node-name",
143 &self.inner.reg.node_name.hyphenated().to_string(),
144 )
145 .send()
146 .await
147 .with_context(|| format!("Error sending HTTP GET request: {}.", &url))?
148 .error_for_status()
149 .with_context(|| format!("HTTP GET request returned an error response: {}", &url))?
150 .json()
151 .await
152 .with_context(|| format!("Error parsing the HTTP GET request JSON: {}", &url))?;
153
154 Ok(resp)
155 }
156
157 pub async fn post<T: Serialize, R: DeserializeOwned>(&self, url: &str, data: T) -> Result<R> {
158 let url: Url = url.parse()?;
159
160 let resp: R = self
161 .inner
162 .http_client
163 .post(url.clone())
164 .json(&data)
165 .bearer_auth(&self.inner.reg.authentication_token)
166 .header(
167 "x-lunatic-node-name",
168 &self.inner.reg.node_name.hyphenated().to_string(),
169 )
170 .send()
171 .await
172 .with_context(|| format!("Error sending HTTP POST request: {}.", &url))?
173 .error_for_status()
174 .with_context(|| format!("HTTP POST request returned an error response: {}", &url))?
175 .json()
176 .await
177 .with_context(|| format!("Error parsing the HTTP POST request JSON: {}", &url))?;
178
179 Ok(resp)
180 }
181
182 pub async fn upload<R: DeserializeOwned>(&self, url: &str, body: Vec<u8>) -> Result<R> {
183 let url: Url = url.parse()?;
184
185 let resp: R = self
186 .inner
187 .http_client
188 .post(url.clone())
189 .body(body)
190 .bearer_auth(&self.inner.reg.authentication_token)
191 .header(
192 "x-lunatic-node-name",
193 &self.inner.reg.node_name.hyphenated().to_string(),
194 )
195 .send()
196 .await
197 .with_context(|| format!("Error sending HTTP POST request: {}.", &url))?
198 .error_for_status()
199 .with_context(|| format!("HTTP POST request returned an error response: {}", &url))
200 .map_err(|e| dbg!(e))?
201 .json()
202 .await
203 .with_context(|| format!("Error parsing the HTTP POST request JSON: {}", &url))?;
204
205 Ok(resp)
206 }
207
208 pub async fn refresh_nodes(&self) -> Result<()> {
209 let resp: NodesList = self.get(&self.inner.reg.urls.nodes, None).await?;
210 let mut node_ids = vec![];
211 for node in resp.nodes {
212 let id = node.id;
213 node_ids.push(id);
214 if !self.inner.nodes.contains_key(&id) {
215 self.inner.nodes.insert(id, node);
216 }
217 }
218 if let Ok(mut self_node_ids) = self.inner.node_ids.write() {
219 *self_node_ids = node_ids;
220 }
221 Ok(())
222 }
223
224 pub async fn notify_node_stopped(&self) -> Result<()> {
225 self.post(&self.inner.reg.urls.node_stopped, ()).await?;
226 Ok(())
227 }
228
229 pub fn node_info(&self, node_id: u64) -> Option<NodeInfo> {
230 self.inner.nodes.get(&node_id).map(|e| e.clone())
231 }
232
233 pub fn node_ids(&self) -> Vec<u64> {
234 self.inner.node_ids.read().unwrap().clone()
235 }
236
237 pub async fn lookup_nodes(&self, query: &str) -> Result<(u64, usize)> {
238 let resp: NodesList = self
239 .get(&self.inner.reg.urls.get_nodes, Some(query))
240 .await?;
241 let nodes: Vec<u64> = resp.nodes.into_iter().map(move |v| v.id).collect();
242 let nodes_count = nodes.len();
243 let query_id = self.next_query_id();
244 self.inner.node_queries.insert(query_id, nodes);
245 Ok((query_id, nodes_count))
246 }
247
248 pub fn query_result(&self, query_id: &u64) -> Option<(u64, Vec<u64>)> {
249 self.inner.node_queries.remove(query_id)
250 }
251
252 pub fn node_count(&self) -> usize {
253 self.inner.node_ids.read().unwrap().len()
254 }
255
256 pub async fn get_module(&self, module_id: u64, environment_id: u64) -> Result<Vec<u8>> {
257 log::info!("Get module {module_id}");
258 let url = self
259 .inner
260 .reg
261 .urls
262 .get_module
263 .replace("{id}", &module_id.to_string());
264 let query = format!("env_id={environment_id}");
265 let resp: ModuleBytes = self.get(&url, Some(&query)).await?;
266 Ok(resp.bytes)
267 }
268
269 pub async fn add_module(&self, module: Vec<u8>) -> Result<RawWasm> {
270 let url = &self.inner.reg.urls.add_module;
271 let resp: ModuleId = self.upload(url, module.clone()).await?;
272 Ok(RawWasm::new(Some(resp.module_id), module))
273 }
274}
275
276async fn refresh_nodes_task(client: Client) -> Result<()> {
277 loop {
278 client.refresh_nodes().await.ok();
279 tokio::time::sleep(Duration::from_secs(5)).await;
280 }
281}