1use super::{FileLock, SparseIndex};
2use crate::{Error, IndexKrate, KrateName};
3pub use reqwest::blocking::Client;
4pub use reqwest::Client as AsyncClient;
5use std::collections::{BTreeMap, BTreeSet};
6
7pub struct RemoteSparseIndex {
9 pub index: SparseIndex,
11 pub client: Client,
13}
14
15impl RemoteSparseIndex {
16 #[inline]
19 pub fn new(index: SparseIndex, client: Client) -> Self {
20 Self { index, client }
21 }
22
23 pub fn krate(
30 &self,
31 name: KrateName<'_>,
32 write_cache_entry: bool,
33 lock: &FileLock,
34 ) -> Result<Option<IndexKrate>, Error> {
35 let req = self.index.make_remote_request(name, None, lock)?;
36 let (
37 http::request::Parts {
38 method,
39 uri,
40 version,
41 headers,
42 ..
43 },
44 _,
45 ) = req.into_parts();
46
47 let mut req = self.client.request(method, uri.to_string());
48 req = req.version(version);
49 req = req.headers(headers);
50
51 let res = self.client.execute(req.build()?)?;
52
53 let mut builder = http::Response::builder()
54 .status(res.status())
55 .version(res.version());
56
57 builder
58 .headers_mut()
59 .unwrap()
60 .extend(res.headers().iter().map(|(k, v)| (k.clone(), v.clone())));
61
62 let body = res.bytes()?;
63 let res = builder.body(body.to_vec())?;
64
65 self.index
66 .parse_remote_response(name, res, write_cache_entry, lock)
67 }
68
69 #[inline]
75 pub fn cached_krate(
76 &self,
77 name: KrateName<'_>,
78 lock: &FileLock,
79 ) -> Result<Option<IndexKrate>, Error> {
80 self.index.cached_krate(name, lock)
81 }
82
83 #[inline]
88 pub fn krates(
89 &self,
90 mut krates: BTreeSet<String>,
91 write_cache_entries: bool,
92 lock: &FileLock,
93 ) -> BTreeMap<String, Result<Option<IndexKrate>, Error>> {
94 let Some(prep_krate) = krates.pop_last() else {
95 return Default::default();
96 };
97
98 let prep = || {
99 let name = prep_krate.as_str().try_into()?;
100 self.krate(name, write_cache_entries, lock)
101 };
102
103 let prep_krate_res = prep();
104
105 use rayon::prelude::*;
106 let mut results: BTreeMap<_, _> = krates
107 .into_par_iter()
108 .map(|kname| {
109 let res = || {
110 let name = kname.as_str().try_into()?;
111 self.krate(name, write_cache_entries, lock)
112 };
113 let res = res();
114 (kname, res)
115 })
116 .collect();
117
118 results.insert(prep_krate, prep_krate_res);
119 results
120 }
121}
122
123pub struct AsyncRemoteSparseIndex {
125 pub index: SparseIndex,
127 pub client: AsyncClient,
129}
130
131impl AsyncRemoteSparseIndex {
132 #[inline]
135 pub fn new(index: SparseIndex, client: AsyncClient) -> Self {
136 Self { index, client }
137 }
138
139 pub async fn krate_async(
141 &self,
142 name: KrateName<'_>,
143 write_cache_entry: bool,
144 lock: &FileLock,
145 ) -> Result<Option<IndexKrate>, Error> {
146 let req = self.index.make_remote_request(name, None, lock)?;
147
148 let (
149 http::request::Parts {
150 method,
151 uri,
152 version,
153 headers,
154 ..
155 },
156 _,
157 ) = req.into_parts();
158
159 let mut req = self.client.request(method, uri.to_string());
160 req = req.version(version);
161 req = req.headers(headers);
162
163 let res = Self::exec_request(&self.client, req.build()?).await?;
164
165 self.index
166 .parse_remote_response(name, res, write_cache_entry, lock)
167 }
168
169 async fn exec_request(
170 client: &AsyncClient,
171 req: reqwest::Request,
172 ) -> Result<http::Response<Vec<u8>>, Error> {
173 let res = loop {
175 let reqc = req.try_clone().unwrap();
176 let res = client.execute(reqc).await;
177
178 match res {
179 Err(err) if err.is_connect() || err.is_timeout() || err.is_request() => continue,
180 Err(err) => return Err(err.into()),
181 Ok(res) => break res,
182 }
183 };
184
185 let mut builder = http::Response::builder()
186 .status(res.status())
187 .version(res.version());
188
189 builder
190 .headers_mut()
191 .unwrap()
192 .extend(res.headers().iter().map(|(k, v)| (k.clone(), v.clone())));
193
194 let body = res.bytes().await?;
195 Ok(builder.body(body.to_vec())?)
196 }
197
198 #[inline]
204 pub fn cached_krate(
205 &self,
206 name: KrateName<'_>,
207 lock: &FileLock,
208 ) -> Result<Option<IndexKrate>, Error> {
209 self.index.cached_krate(name, lock)
210 }
211
212 pub async fn krates(
226 &self,
227 mut krates: BTreeSet<String>,
228 write_cache_entries: bool,
229 individual_timeout: Option<std::time::Duration>,
230 lock: &FileLock,
231 ) -> BTreeMap<String, Result<Option<IndexKrate>, Error>> {
232 let Some(prep_krate) = krates.pop_last() else {
233 return Default::default();
234 };
235
236 let create_req = |kname: &str| -> Result<reqwest::Request, Error> {
237 let name = kname.try_into()?;
238 let req = self.index.make_remote_request(name, None, lock)?;
239
240 let (
241 http::request::Parts {
242 method,
243 uri,
244 version,
245 headers,
246 ..
247 },
248 _,
249 ) = req.into_parts();
250
251 let mut req = self.client.request(method, uri.to_string());
252 req = req.version(version);
253 req = req.headers(headers);
254
255 Ok(req.build()?)
256 };
257
258 let mut results = BTreeMap::new();
259
260 {
261 let result;
262 match create_req(&prep_krate) {
263 Ok(req) => match Self::exec_request(&self.client, req).await {
264 Ok(res) => {
265 result = self.index.parse_remote_response(
266 prep_krate.as_str().try_into().unwrap(),
267 res,
268 write_cache_entries,
269 lock,
270 );
271 }
272 Err(err) => result = Err(err),
273 },
274 Err(err) => result = Err(err),
275 }
276
277 results.insert(prep_krate, result);
278 }
279
280 let mut tasks = tokio::task::JoinSet::new();
281
282 for kname in krates {
283 match create_req(kname.as_str()) {
284 Ok(req) => {
285 let client = self.client.clone();
286 tasks.spawn(async move {
287 let res = if let Some(to) = individual_timeout {
288 match tokio::time::timeout(to, Self::exec_request(&client, req)).await {
289 Ok(res) => res,
290 Err(_) => Err(Error::Http(crate::HttpError::Timeout)),
291 }
292 } else {
293 Self::exec_request(&client, req).await
294 };
295
296 (kname, res)
297 });
298 }
299 Err(err) => {
300 results.insert(kname, Err(err));
301 }
302 }
303 }
304
305 let (tx, rx) = crossbeam_channel::unbounded();
306 while let Some(res) = tasks.join_next().await {
307 let Ok(res) = res else {
308 continue;
309 };
310 let _ = tx.send(res);
311 }
312
313 drop(tx);
314
315 let results = std::sync::Mutex::new(results);
316 rayon::scope(|s| {
317 while let Ok((kname, res)) = rx.recv() {
318 s.spawn(|_s| {
319 let res = res.and_then(|res| {
320 let name = kname
321 .as_str()
322 .try_into()
323 .expect("this was already validated");
324 self.index
325 .parse_remote_response(name, res, write_cache_entries, lock)
326 });
327
328 results.lock().unwrap().insert(kname, res);
329 });
330 }
331 });
332
333 results.into_inner().unwrap()
334 }
335
336 pub fn krates_blocking(
341 &self,
342 krates: BTreeSet<String>,
343 write_cache_entries: bool,
344 individual_timeout: Option<std::time::Duration>,
345 lock: &FileLock,
346 ) -> Result<BTreeMap<String, Result<Option<IndexKrate>, Error>>, tokio::runtime::TryCurrentError>
347 {
348 let current = tokio::runtime::Handle::try_current()?;
349 Ok(current.block_on(async {
350 self.krates(krates, write_cache_entries, individual_timeout, lock)
351 .await
352 }))
353 }
354}
355
356impl From<reqwest::Error> for Error {
357 #[inline]
358 fn from(e: reqwest::Error) -> Self {
359 Self::Http(crate::HttpError::Reqwest(e))
360 }
361}
362
363impl From<http::Error> for Error {
364 #[inline]
365 fn from(e: http::Error) -> Self {
366 Self::Http(crate::HttpError::Http(e))
367 }
368}