tame_index/index/
sparse_remote.rs

1use super::{FileLock, SparseIndex};
2use crate::{Error, IndexKrate, KrateName};
3pub use reqwest::blocking::Client;
4pub use reqwest::Client as AsyncClient;
5use std::collections::{BTreeMap, BTreeSet};
6
7/// Allows **blocking** access to a remote HTTP sparse registry index
8pub struct RemoteSparseIndex {
9    /// The local index this remote is wrapping
10    pub index: SparseIndex,
11    /// The client used to make requests to the remote index
12    pub client: Client,
13}
14
15impl RemoteSparseIndex {
16    /// Creates a new [`Self`] that can access and write local cache entries,
17    /// and contact the remote index to retrieve the latest index information
18    #[inline]
19    pub fn new(index: SparseIndex, client: Client) -> Self {
20        Self { index, client }
21    }
22
23    /// Gets the latest index metadata for the crate
24    ///
25    /// Network I/O is _always_ performed when calling this method, however the
26    /// response from the remote registry will be empty of contents other than
27    /// headers if the local cache entry for the crate is up to date with the
28    /// latest in the index
29    pub fn krate(
30        &self,
31        name: KrateName<'_>,
32        write_cache_entry: bool,
33        lock: &FileLock,
34    ) -> Result<Option<IndexKrate>, Error> {
35        let req = self.index.make_remote_request(name, None, lock)?;
36        let (
37            http::request::Parts {
38                method,
39                uri,
40                version,
41                headers,
42                ..
43            },
44            _,
45        ) = req.into_parts();
46
47        let mut req = self.client.request(method, uri.to_string());
48        req = req.version(version);
49        req = req.headers(headers);
50
51        let res = self.client.execute(req.build()?)?;
52
53        let mut builder = http::Response::builder()
54            .status(res.status())
55            .version(res.version());
56
57        builder
58            .headers_mut()
59            .unwrap()
60            .extend(res.headers().iter().map(|(k, v)| (k.clone(), v.clone())));
61
62        let body = res.bytes()?;
63        let res = builder.body(body.to_vec())?;
64
65        self.index
66            .parse_remote_response(name, res, write_cache_entry, lock)
67    }
68
69    /// Attempts to read the locally cached crate information
70    ///
71    /// This method does no network I/O unlike [`Self::krate`], but does not
72    /// guarantee that the cache information is up to date with the latest in
73    /// the remote index
74    #[inline]
75    pub fn cached_krate(
76        &self,
77        name: KrateName<'_>,
78        lock: &FileLock,
79    ) -> Result<Option<IndexKrate>, Error> {
80        self.index.cached_krate(name, lock)
81    }
82
83    /// Helper method for downloading multiple crates in parallel
84    ///
85    /// Note that in most cases using [`AsyncRemoteSparseIndex::krates_blocking`]
86    /// will outperform this method, especially on lower core counts
87    #[inline]
88    pub fn krates(
89        &self,
90        mut krates: BTreeSet<String>,
91        write_cache_entries: bool,
92        lock: &FileLock,
93    ) -> BTreeMap<String, Result<Option<IndexKrate>, Error>> {
94        let Some(prep_krate) = krates.pop_last() else {
95            return Default::default();
96        };
97
98        let prep = || {
99            let name = prep_krate.as_str().try_into()?;
100            self.krate(name, write_cache_entries, lock)
101        };
102
103        let prep_krate_res = prep();
104
105        use rayon::prelude::*;
106        let mut results: BTreeMap<_, _> = krates
107            .into_par_iter()
108            .map(|kname| {
109                let res = || {
110                    let name = kname.as_str().try_into()?;
111                    self.krate(name, write_cache_entries, lock)
112                };
113                let res = res();
114                (kname, res)
115            })
116            .collect();
117
118        results.insert(prep_krate, prep_krate_res);
119        results
120    }
121}
122
123/// Allows **async** access to a remote HTTP sparse registry index
124pub struct AsyncRemoteSparseIndex {
125    /// The local index this remote is wrapping
126    pub index: SparseIndex,
127    /// The client used to make requests to the remote index
128    pub client: AsyncClient,
129}
130
131impl AsyncRemoteSparseIndex {
132    /// Creates a new [`Self`] that can access and write local cache entries,
133    /// and contact the remote index to retrieve the latest index information
134    #[inline]
135    pub fn new(index: SparseIndex, client: AsyncClient) -> Self {
136        Self { index, client }
137    }
138
139    /// Async version of [`RemoteSparseIndex::krate`]
140    pub async fn krate_async(
141        &self,
142        name: KrateName<'_>,
143        write_cache_entry: bool,
144        lock: &FileLock,
145    ) -> Result<Option<IndexKrate>, Error> {
146        let req = self.index.make_remote_request(name, None, lock)?;
147
148        let (
149            http::request::Parts {
150                method,
151                uri,
152                version,
153                headers,
154                ..
155            },
156            _,
157        ) = req.into_parts();
158
159        let mut req = self.client.request(method, uri.to_string());
160        req = req.version(version);
161        req = req.headers(headers);
162
163        let res = Self::exec_request(&self.client, req.build()?).await?;
164
165        self.index
166            .parse_remote_response(name, res, write_cache_entry, lock)
167    }
168
169    async fn exec_request(
170        client: &AsyncClient,
171        req: reqwest::Request,
172    ) -> Result<http::Response<Vec<u8>>, Error> {
173        // This is unfortunate, but we always make a copy in case we need to retry
174        let res = loop {
175            let reqc = req.try_clone().unwrap();
176            let res = client.execute(reqc).await;
177
178            match res {
179                Err(err) if err.is_connect() || err.is_timeout() || err.is_request() => continue,
180                Err(err) => return Err(err.into()),
181                Ok(res) => break res,
182            }
183        };
184
185        let mut builder = http::Response::builder()
186            .status(res.status())
187            .version(res.version());
188
189        builder
190            .headers_mut()
191            .unwrap()
192            .extend(res.headers().iter().map(|(k, v)| (k.clone(), v.clone())));
193
194        let body = res.bytes().await?;
195        Ok(builder.body(body.to_vec())?)
196    }
197
198    /// Attempts to read the locally cached crate information
199    ///
200    /// This method does no network I/O unlike [`Self::krate_async`], but does not
201    /// guarantee that the cache information is up to date with the latest in
202    /// the remote index
203    #[inline]
204    pub fn cached_krate(
205        &self,
206        name: KrateName<'_>,
207        lock: &FileLock,
208    ) -> Result<Option<IndexKrate>, Error> {
209        self.index.cached_krate(name, lock)
210    }
211
212    /// Helper method for downloading multiples crates concurrently
213    ///
214    /// This method will generally perform better than [`RemoteSparseIndex::krates`]
215    ///
216    /// One notable difference with this method is that you can specify a maximum
217    /// duration that each individual krate request can take before it is timed out.
218    /// This is because certain [errors](https://github.com/seanmonstar/reqwest/issues/1748)
219    /// can occur when making many concurrent requests, which we detect and retry
220    /// automatically, but with (by default) no upper bound in number of
221    /// retries/time.
222    ///
223    /// You can also run this entire operation with a single timeout if you wish,
224    /// via something like [`tokio::time::timeout`](https://docs.rs/tokio/latest/tokio/time/fn.timeout.html)
225    pub async fn krates(
226        &self,
227        mut krates: BTreeSet<String>,
228        write_cache_entries: bool,
229        individual_timeout: Option<std::time::Duration>,
230        lock: &FileLock,
231    ) -> BTreeMap<String, Result<Option<IndexKrate>, Error>> {
232        let Some(prep_krate) = krates.pop_last() else {
233            return Default::default();
234        };
235
236        let create_req = |kname: &str| -> Result<reqwest::Request, Error> {
237            let name = kname.try_into()?;
238            let req = self.index.make_remote_request(name, None, lock)?;
239
240            let (
241                http::request::Parts {
242                    method,
243                    uri,
244                    version,
245                    headers,
246                    ..
247                },
248                _,
249            ) = req.into_parts();
250
251            let mut req = self.client.request(method, uri.to_string());
252            req = req.version(version);
253            req = req.headers(headers);
254
255            Ok(req.build()?)
256        };
257
258        let mut results = BTreeMap::new();
259
260        {
261            let result;
262            match create_req(&prep_krate) {
263                Ok(req) => match Self::exec_request(&self.client, req).await {
264                    Ok(res) => {
265                        result = self.index.parse_remote_response(
266                            prep_krate.as_str().try_into().unwrap(),
267                            res,
268                            write_cache_entries,
269                            lock,
270                        );
271                    }
272                    Err(err) => result = Err(err),
273                },
274                Err(err) => result = Err(err),
275            }
276
277            results.insert(prep_krate, result);
278        }
279
280        let mut tasks = tokio::task::JoinSet::new();
281
282        for kname in krates {
283            match create_req(kname.as_str()) {
284                Ok(req) => {
285                    let client = self.client.clone();
286                    tasks.spawn(async move {
287                        let res = if let Some(to) = individual_timeout {
288                            match tokio::time::timeout(to, Self::exec_request(&client, req)).await {
289                                Ok(res) => res,
290                                Err(_) => Err(Error::Http(crate::HttpError::Timeout)),
291                            }
292                        } else {
293                            Self::exec_request(&client, req).await
294                        };
295
296                        (kname, res)
297                    });
298                }
299                Err(err) => {
300                    results.insert(kname, Err(err));
301                }
302            }
303        }
304
305        let (tx, rx) = crossbeam_channel::unbounded();
306        while let Some(res) = tasks.join_next().await {
307            let Ok(res) = res else {
308                continue;
309            };
310            let _ = tx.send(res);
311        }
312
313        drop(tx);
314
315        let results = std::sync::Mutex::new(results);
316        rayon::scope(|s| {
317            while let Ok((kname, res)) = rx.recv() {
318                s.spawn(|_s| {
319                    let res = res.and_then(|res| {
320                        let name = kname
321                            .as_str()
322                            .try_into()
323                            .expect("this was already validated");
324                        self.index
325                            .parse_remote_response(name, res, write_cache_entries, lock)
326                    });
327
328                    results.lock().unwrap().insert(kname, res);
329                });
330            }
331        });
332
333        results.into_inner().unwrap()
334    }
335
336    /// A non-async version of [`Self::krates`]
337    ///
338    /// Using this method requires that there is an active tokio runtime as
339    /// described [here](https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.current)
340    pub fn krates_blocking(
341        &self,
342        krates: BTreeSet<String>,
343        write_cache_entries: bool,
344        individual_timeout: Option<std::time::Duration>,
345        lock: &FileLock,
346    ) -> Result<BTreeMap<String, Result<Option<IndexKrate>, Error>>, tokio::runtime::TryCurrentError>
347    {
348        let current = tokio::runtime::Handle::try_current()?;
349        Ok(current.block_on(async {
350            self.krates(krates, write_cache_entries, individual_timeout, lock)
351                .await
352        }))
353    }
354}
355
356impl From<reqwest::Error> for Error {
357    #[inline]
358    fn from(e: reqwest::Error) -> Self {
359        Self::Http(crate::HttpError::Reqwest(e))
360    }
361}
362
363impl From<http::Error> for Error {
364    #[inline]
365    fn from(e: http::Error) -> Self {
366        Self::Http(crate::HttpError::Http(e))
367    }
368}