lunatic_distributed/control/
client.rs

1use anyhow::{Context, Result};
2use dashmap::DashMap;
3use lunatic_control::api::*;
4use lunatic_control::NodeInfo;
5use lunatic_process::runtimes::RawWasm;
6use reqwest::{Client as HttpClient, Url};
7use serde::{de::DeserializeOwned, Serialize};
8use std::{
9    collections::HashMap,
10    net::SocketAddr,
11    sync::{atomic, atomic::AtomicU64, Arc, RwLock},
12    time::Duration,
13};
14
15#[derive(Clone)]
16pub struct Client {
17    inner: Arc<InnerClient>,
18}
19
20pub struct InnerClient {
21    reg: Registration,
22    node_id: u64,
23    http_client: HttpClient,
24    next_message_id: AtomicU64,
25    next_query_id: AtomicU64,
26    node_queries: DashMap<u64, Vec<u64>>,
27    nodes: DashMap<u64, NodeInfo>,
28    node_ids: RwLock<Vec<u64>>,
29}
30
31impl Client {
32    pub async fn new(
33        http_client: HttpClient,
34        reg: Registration,
35        node_address: SocketAddr,
36        attributes: HashMap<String, String>,
37    ) -> Result<Self> {
38        let node_id = Self::start(
39            &http_client,
40            &reg,
41            NodeStart {
42                node_address,
43                attributes,
44            },
45        )
46        .await?;
47
48        let client = Client {
49            inner: Arc::new(InnerClient {
50                reg,
51                node_id,
52                http_client,
53                next_message_id: AtomicU64::new(1),
54                node_queries: DashMap::new(),
55                next_query_id: AtomicU64::new(1),
56                nodes: Default::default(),
57                node_ids: Default::default(),
58            }),
59        };
60
61        tokio::task::spawn(refresh_nodes_task(client.clone()));
62        client.refresh_nodes().await?;
63
64        Ok(client)
65    }
66
67    pub async fn register(
68        http_client: &HttpClient,
69        control_url: Url,
70        node_name: uuid::Uuid,
71        csr_pem: String,
72    ) -> Result<Registration> {
73        let reg = Register { node_name, csr_pem };
74        Self::send_registration(http_client, control_url, reg).await
75    }
76
77    pub fn reg(&self) -> Registration {
78        self.inner.reg.clone()
79    }
80
81    pub fn node_id(&self) -> u64 {
82        self.inner.node_id
83    }
84
85    pub fn next_message_id(&self) -> u64 {
86        self.inner
87            .next_message_id
88            .fetch_add(1, atomic::Ordering::Relaxed)
89    }
90
91    pub fn next_query_id(&self) -> u64 {
92        self.inner
93            .next_query_id
94            .fetch_add(1, atomic::Ordering::Relaxed)
95    }
96
97    async fn send_registration(
98        client: &HttpClient,
99        url: Url,
100        reg: Register,
101    ) -> Result<Registration> {
102        let resp: Registration = client
103            .post(url)
104            .json(&reg)
105            .send()
106            .await
107            .with_context(|| "Error sending HTTP registration request.")?
108            .error_for_status()
109            .with_context(|| "HTTP registration request returned an error response.")?
110            .json()
111            .await
112            .with_context(|| "Error parsing the registration request JSON.")?;
113        Ok(resp)
114    }
115
116    async fn start(client: &HttpClient, reg: &Registration, start: NodeStart) -> Result<u64> {
117        let resp: NodeStarted = client
118            .post(&reg.urls.node_started)
119            .json(&start)
120            .bearer_auth(&reg.authentication_token)
121            .header(
122                "x-lunatic-node-name",
123                &reg.node_name.hyphenated().to_string(),
124            )
125            .send()
126            .await?
127            .json()
128            .await?;
129        Ok(resp.node_id as u64)
130    }
131
132    pub async fn get<T: DeserializeOwned>(&self, url: &str, query: Option<&str>) -> Result<T> {
133        let mut url: Url = url.parse()?;
134        url.set_query(query);
135
136        let resp: T = self
137            .inner
138            .http_client
139            .get(url.clone())
140            .bearer_auth(&self.inner.reg.authentication_token)
141            .header(
142                "x-lunatic-node-name",
143                &self.inner.reg.node_name.hyphenated().to_string(),
144            )
145            .send()
146            .await
147            .with_context(|| format!("Error sending HTTP GET request: {}.", &url))?
148            .error_for_status()
149            .with_context(|| format!("HTTP GET request returned an error response: {}", &url))?
150            .json()
151            .await
152            .with_context(|| format!("Error parsing the HTTP GET request JSON: {}", &url))?;
153
154        Ok(resp)
155    }
156
157    pub async fn post<T: Serialize, R: DeserializeOwned>(&self, url: &str, data: T) -> Result<R> {
158        let url: Url = url.parse()?;
159
160        let resp: R = self
161            .inner
162            .http_client
163            .post(url.clone())
164            .json(&data)
165            .bearer_auth(&self.inner.reg.authentication_token)
166            .header(
167                "x-lunatic-node-name",
168                &self.inner.reg.node_name.hyphenated().to_string(),
169            )
170            .send()
171            .await
172            .with_context(|| format!("Error sending HTTP POST request: {}.", &url))?
173            .error_for_status()
174            .with_context(|| format!("HTTP POST request returned an error response: {}", &url))?
175            .json()
176            .await
177            .with_context(|| format!("Error parsing the HTTP POST request JSON: {}", &url))?;
178
179        Ok(resp)
180    }
181
182    pub async fn upload<R: DeserializeOwned>(&self, url: &str, body: Vec<u8>) -> Result<R> {
183        let url: Url = url.parse()?;
184
185        let resp: R = self
186            .inner
187            .http_client
188            .post(url.clone())
189            .body(body)
190            .bearer_auth(&self.inner.reg.authentication_token)
191            .header(
192                "x-lunatic-node-name",
193                &self.inner.reg.node_name.hyphenated().to_string(),
194            )
195            .send()
196            .await
197            .with_context(|| format!("Error sending HTTP POST request: {}.", &url))?
198            .error_for_status()
199            .with_context(|| format!("HTTP POST request returned an error response: {}", &url))
200            .map_err(|e| dbg!(e))?
201            .json()
202            .await
203            .with_context(|| format!("Error parsing the HTTP POST request JSON: {}", &url))?;
204
205        Ok(resp)
206    }
207
208    pub async fn refresh_nodes(&self) -> Result<()> {
209        let resp: NodesList = self.get(&self.inner.reg.urls.nodes, None).await?;
210        let mut node_ids = vec![];
211        for node in resp.nodes {
212            let id = node.id;
213            node_ids.push(id);
214            if !self.inner.nodes.contains_key(&id) {
215                self.inner.nodes.insert(id, node);
216            }
217        }
218        if let Ok(mut self_node_ids) = self.inner.node_ids.write() {
219            *self_node_ids = node_ids;
220        }
221        Ok(())
222    }
223
224    pub async fn notify_node_stopped(&self) -> Result<()> {
225        self.post(&self.inner.reg.urls.node_stopped, ()).await?;
226        Ok(())
227    }
228
229    pub fn node_info(&self, node_id: u64) -> Option<NodeInfo> {
230        self.inner.nodes.get(&node_id).map(|e| e.clone())
231    }
232
233    pub fn node_ids(&self) -> Vec<u64> {
234        self.inner.node_ids.read().unwrap().clone()
235    }
236
237    pub async fn lookup_nodes(&self, query: &str) -> Result<(u64, usize)> {
238        let resp: NodesList = self
239            .get(&self.inner.reg.urls.get_nodes, Some(query))
240            .await?;
241        let nodes: Vec<u64> = resp.nodes.into_iter().map(move |v| v.id).collect();
242        let nodes_count = nodes.len();
243        let query_id = self.next_query_id();
244        self.inner.node_queries.insert(query_id, nodes);
245        Ok((query_id, nodes_count))
246    }
247
248    pub fn query_result(&self, query_id: &u64) -> Option<(u64, Vec<u64>)> {
249        self.inner.node_queries.remove(query_id)
250    }
251
252    pub fn node_count(&self) -> usize {
253        self.inner.node_ids.read().unwrap().len()
254    }
255
256    pub async fn get_module(&self, module_id: u64, environment_id: u64) -> Result<Vec<u8>> {
257        log::info!("Get module {module_id}");
258        let url = self
259            .inner
260            .reg
261            .urls
262            .get_module
263            .replace("{id}", &module_id.to_string());
264        let query = format!("env_id={environment_id}");
265        let resp: ModuleBytes = self.get(&url, Some(&query)).await?;
266        Ok(resp.bytes)
267    }
268
269    pub async fn add_module(&self, module: Vec<u8>) -> Result<RawWasm> {
270        let url = &self.inner.reg.urls.add_module;
271        let resp: ModuleId = self.upload(url, module.clone()).await?;
272        Ok(RawWasm::new(Some(resp.module_id), module))
273    }
274}
275
276async fn refresh_nodes_task(client: Client) -> Result<()> {
277    loop {
278        client.refresh_nodes().await.ok();
279        tokio::time::sleep(Duration::from_secs(5)).await;
280    }
281}