gevulot_rs/
worker_client.rs

1use std::sync::Arc;
2use tokio::sync::RwLock;
3
4use crate::{
5    base_client::BaseClient,
6    error::{Error, Result},
7    proto::{
8        cosmos::base::query::v1beta1::PageRequest,
9        gevulot::gevulot::{
10            MsgAnnounceWorkerExit, MsgAnnounceWorkerExitResponse, MsgCreateWorker,
11            MsgCreateWorkerResponse, MsgDeleteWorker, MsgDeleteWorkerResponse, MsgUpdateWorker,
12            MsgUpdateWorkerResponse, QueryAllWorkerRequest,
13        },
14    },
15};
16
17/// Default page size for pagination.
18const PAGE_SIZE: u64 = 100;
19
20/// Client for managing workers in the Gevulot system.
21#[derive(Debug, Clone)]
22pub struct WorkerClient {
23    base_client: Arc<RwLock<BaseClient>>,
24}
25
26impl WorkerClient {
27    /// Creates a new instance of WorkerClient.
28    ///
29    /// # Arguments
30    ///
31    /// * `base_client` - An Arc-wrapped RwLock of the BaseClient.
32    ///
33    /// # Returns
34    ///
35    /// A new instance of WorkerClient.
36    pub fn new(base_client: Arc<RwLock<BaseClient>>) -> Self {
37        Self { base_client }
38    }
39
40    /// Lists all workers.
41    ///
42    /// # Returns
43    ///
44    /// A Result containing a vector of workers or an error.
45    ///
46    /// # Errors
47    ///
48    /// This function will return an error if the request to the Gevulot client fails.
49    pub async fn list(&mut self) -> Result<Vec<crate::proto::gevulot::gevulot::Worker>> {
50        let mut all_workers = Vec::new();
51        let mut next_key: Option<Vec<u8>> = None;
52
53        loop {
54            // Construct request with pagination for the current page.
55            let pagination = Some(PageRequest {
56                key: next_key.unwrap_or_default(),
57                limit: PAGE_SIZE,
58                ..Default::default()
59            });
60            let request = QueryAllWorkerRequest { pagination };
61
62            let response = self
63                .base_client
64                .write()
65                .await
66                .gevulot_client
67                .worker_all(request)
68                .await?;
69
70            let inner = response.into_inner();
71            all_workers.extend(inner.worker);
72
73            // Handle next page.
74            next_key = inner.pagination.and_then(|p| {
75                if p.next_key.is_empty() {
76                    None
77                } else {
78                    Some(p.next_key)
79                }
80            });
81            if next_key.is_none() {
82                break;
83            }
84        }
85
86        Ok(all_workers)
87    }
88
89    /// Gets a worker by its ID.
90    ///
91    /// # Arguments
92    ///
93    /// * `id` - The ID of the worker to retrieve.
94    ///
95    /// # Returns
96    ///
97    /// A Result containing the worker or an error.
98    ///
99    /// # Errors
100    ///
101    /// This function will return an error if the worker is not found or if the request to the Gevulot client fails.
102    pub async fn get(&mut self, id: &str) -> Result<crate::proto::gevulot::gevulot::Worker> {
103        let request = crate::proto::gevulot::gevulot::QueryGetWorkerRequest { id: id.to_owned() };
104        let response = self
105            .base_client
106            .write()
107            .await
108            .gevulot_client
109            .worker(request)
110            .await?;
111        response.into_inner().worker.ok_or(Error::NotFound)
112    }
113
114    /// Creates a new worker.
115    ///
116    /// # Arguments
117    ///
118    /// * `msg` - The message containing the worker details.
119    ///
120    /// # Returns
121    ///
122    /// A Result containing the response or an error.
123    ///
124    /// # Errors
125    ///
126    /// This function will return an error if the request to the Gevulot client fails.
127    pub async fn create(&mut self, msg: MsgCreateWorker) -> Result<MsgCreateWorkerResponse> {
128        let resp: MsgCreateWorkerResponse = self
129            .base_client
130            .write()
131            .await
132            .send_msg_sync(msg, "")
133            .await?;
134        Ok(resp)
135    }
136
137    /// Updates a worker.
138    ///
139    /// # Arguments
140    ///
141    /// * `msg` - The message containing the worker details to update.
142    ///
143    /// # Returns
144    ///
145    /// A Result containing the response or an error.
146    ///
147    /// # Errors
148    ///
149    /// This function will return an error if the request to the Gevulot client fails.
150    pub async fn update(&mut self, msg: MsgUpdateWorker) -> Result<MsgUpdateWorkerResponse> {
151        let resp: MsgUpdateWorkerResponse = self
152            .base_client
153            .write()
154            .await
155            .send_msg_sync(msg, "")
156            .await?;
157        Ok(resp)
158    }
159
160    /// Deletes a worker.
161    ///
162    /// # Arguments
163    ///
164    /// * `msg` - The message containing the worker ID to delete.
165    ///
166    /// # Returns
167    ///
168    /// A Result containing the response or an error.
169    ///
170    /// # Errors
171    ///
172    /// This function will return an error if the request to the Gevulot client fails.
173    pub async fn delete(&mut self, msg: MsgDeleteWorker) -> Result<MsgDeleteWorkerResponse> {
174        let resp: MsgDeleteWorkerResponse = self
175            .base_client
176            .write()
177            .await
178            .send_msg_sync(msg, "")
179            .await?;
180        Ok(resp)
181    }
182
183    /// Announces a worker's exit.
184    ///
185    /// # Arguments
186    ///
187    /// * `msg` - The message containing the worker ID to announce exit.
188    ///
189    /// # Returns
190    ///
191    /// A Result containing the response or an error.
192    ///
193    /// # Errors
194    ///
195    /// This function will return an error if the request to the Gevulot client fails.
196    pub async fn announce_exit(
197        &mut self,
198        msg: MsgAnnounceWorkerExit,
199    ) -> Result<MsgAnnounceWorkerExitResponse> {
200        let resp: MsgAnnounceWorkerExitResponse = self
201            .base_client
202            .write()
203            .await
204            .send_msg_sync(msg, "")
205            .await?;
206        Ok(resp)
207    }
208}