1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330
use super::{FileLock, SparseIndex};
use crate::{Error, IndexKrate, KrateName};
pub use reqwest::blocking::Client;
pub use reqwest::Client as AsyncClient;
use std::collections::{BTreeMap, BTreeSet};
/// Allows **blocking** access to a remote HTTP sparse registry index
pub struct RemoteSparseIndex {
/// The local index this remote is wrapping
pub index: SparseIndex,
/// The client used to make requests to the remote index
pub client: Client,
}
impl RemoteSparseIndex {
/// Creates a new [`Self`] that can access and write local cache entries,
/// and contact the remote index to retrieve the latest index information
#[inline]
pub fn new(index: SparseIndex, client: Client) -> Self {
Self { index, client }
}
/// Gets the latest index metadata for the crate
///
/// Network I/O is _always_ performed when calling this method, however the
/// response from the remote registry will be empty of contents other than
/// headers if the local cache entry for the crate is up to date with the
/// latest in the index
pub fn krate(
&self,
name: KrateName<'_>,
write_cache_entry: bool,
lock: &FileLock,
) -> Result<Option<IndexKrate>, Error> {
let req = self.index.make_remote_request(name, None, lock)?;
let req = req.try_into()?;
let res = self.client.execute(req)?;
let mut builder = http::Response::builder()
.status(res.status())
.version(res.version());
builder
.headers_mut()
.unwrap()
.extend(res.headers().iter().map(|(k, v)| (k.clone(), v.clone())));
let body = res.bytes()?;
let res = builder.body(body.to_vec())?;
self.index
.parse_remote_response(name, res, write_cache_entry, lock)
}
/// Attempts to read the locally cached crate information
///
/// This method does no network I/O unlike [`Self::krate`], but does not
/// guarantee that the cache information is up to date with the latest in
/// the remote index
#[inline]
pub fn cached_krate(
&self,
name: KrateName<'_>,
lock: &FileLock,
) -> Result<Option<IndexKrate>, Error> {
self.index.cached_krate(name, lock)
}
/// Helper method for downloading multiple crates in parallel
///
/// Note that in most cases using [`AsyncRemoteSparseIndex::krates_blocking`]
/// will outperform this method, especially on lower core counts
#[inline]
pub fn krates(
&self,
mut krates: BTreeSet<String>,
write_cache_entries: bool,
lock: &FileLock,
) -> BTreeMap<String, Result<Option<IndexKrate>, Error>> {
let Some(prep_krate) = krates.pop_last() else {
return Default::default();
};
let prep = || {
let name = prep_krate.as_str().try_into()?;
self.krate(name, write_cache_entries, lock)
};
let prep_krate_res = prep();
use rayon::prelude::*;
let mut results: BTreeMap<_, _> = krates
.into_par_iter()
.map(|kname| {
let res = || {
let name = kname.as_str().try_into()?;
self.krate(name, write_cache_entries, lock)
};
let res = res();
(kname, res)
})
.collect();
results.insert(prep_krate, prep_krate_res);
results
}
}
/// Allows **async** access to a remote HTTP sparse registry index
pub struct AsyncRemoteSparseIndex {
/// The local index this remote is wrapping
pub index: SparseIndex,
/// The client used to make requests to the remote index
pub client: AsyncClient,
}
impl AsyncRemoteSparseIndex {
/// Creates a new [`Self`] that can access and write local cache entries,
/// and contact the remote index to retrieve the latest index information
#[inline]
pub fn new(index: SparseIndex, client: AsyncClient) -> Self {
Self { index, client }
}
/// Async version of [`RemoteSparseIndex::krate`]
pub async fn krate_async(
&self,
name: KrateName<'_>,
write_cache_entry: bool,
lock: &FileLock,
) -> Result<Option<IndexKrate>, Error> {
let req = self
.index
.make_remote_request(name, None, lock)?
.try_into()?;
let res = Self::exec_request(&self.client, req).await?;
self.index
.parse_remote_response(name, res, write_cache_entry, lock)
}
async fn exec_request(
client: &AsyncClient,
req: reqwest::Request,
) -> Result<http::Response<Vec<u8>>, Error> {
// This is unfortunate, but we always make a copy in case we need to retry
let res = loop {
let reqc = req.try_clone().unwrap();
let res = client.execute(reqc).await;
match res {
Err(err) if err.is_connect() || err.is_timeout() || err.is_request() => continue,
Err(err) => return Err(err.into()),
Ok(res) => break res,
}
};
let mut builder = http::Response::builder()
.status(res.status())
.version(res.version());
builder
.headers_mut()
.unwrap()
.extend(res.headers().iter().map(|(k, v)| (k.clone(), v.clone())));
let body = res.bytes().await?;
Ok(builder.body(body.to_vec())?)
}
/// Attempts to read the locally cached crate information
///
/// This method does no network I/O unlike [`Self::krate_async`], but does not
/// guarantee that the cache information is up to date with the latest in
/// the remote index
#[inline]
pub fn cached_krate(
&self,
name: KrateName<'_>,
lock: &FileLock,
) -> Result<Option<IndexKrate>, Error> {
self.index.cached_krate(name, lock)
}
/// Helper method for downloading multiples crates concurrently
///
/// This method will generally perform better than [`RemoteSparseIndex::krates`]
///
/// One notable difference with this method is that you can specify a maximum
/// duration that each individual krate request can take before it is timed out.
/// This is because certain [errors](https://github.com/seanmonstar/reqwest/issues/1748)
/// can occur when making many concurrent requests, which we detect and retry
/// automatically, but with (by default) no upper bound in number of
/// retries/time.
///
/// You can also run this entire operation with a single timeout if you wish,
/// via something like [`tokio::time::timeout`](https://docs.rs/tokio/latest/tokio/time/fn.timeout.html)
pub async fn krates(
&self,
mut krates: BTreeSet<String>,
write_cache_entries: bool,
individual_timeout: Option<std::time::Duration>,
lock: &FileLock,
) -> BTreeMap<String, Result<Option<IndexKrate>, Error>> {
let Some(prep_krate) = krates.pop_last() else {
return Default::default();
};
let create_req = |kname: &str| -> Result<http::Request<&'static [u8]>, Error> {
let name = kname.try_into()?;
self.index.make_remote_request(name, None, lock)
};
let mut results = BTreeMap::new();
{
let result;
match create_req(&prep_krate).and_then(|req| Ok(req.try_into()?)) {
Ok(req) => match Self::exec_request(&self.client, req).await {
Ok(res) => {
result = self.index.parse_remote_response(
prep_krate.as_str().try_into().unwrap(),
res,
write_cache_entries,
lock,
);
}
Err(err) => result = Err(err),
},
Err(err) => result = Err(err),
}
results.insert(prep_krate, result);
}
let mut tasks = tokio::task::JoinSet::new();
for kname in krates {
match kname.as_str().try_into().and_then(|name| {
Ok(self
.index
.make_remote_request(name, None, lock)?
.try_into()?)
}) {
Ok(req) => {
let client = self.client.clone();
tasks.spawn(async move {
let res = if let Some(to) = individual_timeout {
match tokio::time::timeout(to, Self::exec_request(&client, req)).await {
Ok(res) => res,
Err(_) => Err(Error::Http(crate::HttpError::Timeout)),
}
} else {
Self::exec_request(&client, req).await
};
(kname, res)
});
}
Err(err) => {
results.insert(kname, Err(err));
}
}
}
let (tx, rx) = crossbeam_channel::unbounded();
while let Some(res) = tasks.join_next().await {
let Ok(res) = res else {
continue;
};
let _ = tx.send(res);
}
drop(tx);
let results = std::sync::Mutex::new(results);
rayon::scope(|s| {
while let Ok((kname, res)) = rx.recv() {
s.spawn(|_s| {
let res = res.and_then(|res| {
let name = kname
.as_str()
.try_into()
.expect("this was already validated");
self.index
.parse_remote_response(name, res, write_cache_entries, lock)
});
results.lock().unwrap().insert(kname, res);
});
}
});
results.into_inner().unwrap()
}
/// A non-async version of [`Self::krates`]
///
/// Using this method requires that there is an active tokio runtime as
/// described [here](https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.current)
pub fn krates_blocking(
&self,
krates: BTreeSet<String>,
write_cache_entries: bool,
individual_timeout: Option<std::time::Duration>,
lock: &FileLock,
) -> Result<BTreeMap<String, Result<Option<IndexKrate>, Error>>, tokio::runtime::TryCurrentError>
{
let current = tokio::runtime::Handle::try_current()?;
Ok(current.block_on(async {
self.krates(krates, write_cache_entries, individual_timeout, lock)
.await
}))
}
}
impl From<reqwest::Error> for Error {
#[inline]
fn from(e: reqwest::Error) -> Self {
Self::Http(crate::HttpError::Reqwest(e))
}
}
impl From<http::Error> for Error {
#[inline]
fn from(e: http::Error) -> Self {
Self::Http(crate::HttpError::Http(e))
}
}