use crate::{
lock::{LockFileResolver, LockedPackageVersion},
progress::{ProgressBar, ProgressStyle},
terminal::{Colors, Terminal},
};
use anyhow::{bail, Context, Result};
use futures::{stream::FuturesUnordered, StreamExt};
use indexmap::IndexMap;
use semver::{Comparator, Op, Version, VersionReq};
use serde::{
de::{self, value::MapAccessDeserializer},
Deserialize, Serialize,
};
use std::{
collections::{hash_map, HashMap, HashSet},
fs,
path::{Path, PathBuf},
str::FromStr,
sync::Arc,
};
use url::Url;
use warg_client::{
storage::{ContentStorage, PackageInfo},
Config, FileSystemClient, StorageLockResult,
};
use warg_crypto::hash::AnyHash;
use warg_protocol::registry;
use wit_component::DecodedWasm;
use wit_parser::{PackageId, PackageName, Resolve, UnresolvedPackage, WorldId};
pub const DEFAULT_REGISTRY_NAME: &str = "default";
pub fn find_url<'a>(
name: Option<&str>,
urls: &'a HashMap<String, Url>,
default: Option<&'a str>,
) -> Result<&'a str> {
let name = name.unwrap_or(DEFAULT_REGISTRY_NAME);
match urls.get(name) {
Some(url) => Ok(url.as_str()),
None if name != DEFAULT_REGISTRY_NAME => {
bail!("component registry `{name}` does not exist in the configuration")
}
None => default.context("a default component registry has not been set"),
}
}
pub async fn create_client(
config: &warg_client::Config,
url: &str,
terminal: &Terminal,
) -> Result<FileSystemClient> {
match FileSystemClient::try_new_with_config(Some(url), config, None).await? {
StorageLockResult::Acquired(client) => Ok(client),
StorageLockResult::NotAcquired(path) => {
terminal.status_with_color(
"Blocking",
format!("waiting for file lock on `{path}`", path = path.display()),
Colors::Cyan,
)?;
Ok(FileSystemClient::new_with_config(Some(url), config, None).await?)
}
}
}
#[derive(Debug, Clone)]
pub enum Dependency {
Package(RegistryPackage),
Local(PathBuf),
}
impl Serialize for Dependency {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match self {
Self::Package(package) => {
if package.name.is_none() && package.registry.is_none() {
let version = package.version.to_string();
version.trim_start_matches('^').serialize(serializer)
} else {
#[derive(Serialize)]
struct Entry<'a> {
package: Option<&'a registry::PackageName>,
version: &'a str,
registry: Option<&'a str>,
}
Entry {
package: package.name.as_ref(),
version: package.version.to_string().trim_start_matches('^'),
registry: package.registry.as_deref(),
}
.serialize(serializer)
}
}
Self::Local(path) => {
#[derive(Serialize)]
struct Entry<'a> {
path: &'a PathBuf,
}
Entry { path }.serialize(serializer)
}
}
}
}
impl<'de> Deserialize<'de> for Dependency {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct Visitor;
impl<'de> de::Visitor<'de> for Visitor {
type Value = Dependency;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(formatter, "a string or a table")
}
fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(Self::Value::Package(s.parse().map_err(de::Error::custom)?))
}
fn visit_map<A>(self, map: A) -> Result<Self::Value, A::Error>
where
A: de::MapAccess<'de>,
{
#[derive(Default, Deserialize)]
#[serde(default, deny_unknown_fields)]
struct Entry {
path: Option<PathBuf>,
package: Option<registry::PackageName>,
version: Option<VersionReq>,
registry: Option<String>,
}
let entry = Entry::deserialize(MapAccessDeserializer::new(map))?;
match (entry.path, entry.package, entry.version, entry.registry) {
(Some(path), None, None, None) => Ok(Self::Value::Local(path)),
(None, name, Some(version), registry) => {
Ok(Self::Value::Package(RegistryPackage {
name,
version,
registry,
}))
}
(Some(_), None, Some(_), _) => Err(de::Error::custom(
"cannot specify both `path` and `version` fields in a dependency entry",
)),
(Some(_), None, None, Some(_)) => Err(de::Error::custom(
"cannot specify both `path` and `registry` fields in a dependency entry",
)),
(Some(_), Some(_), _, _) => Err(de::Error::custom(
"cannot specify both `path` and `package` fields in a dependency entry",
)),
(None, None, _, _) => Err(de::Error::missing_field("package")),
(None, Some(_), None, _) => Err(de::Error::missing_field("version")),
}
}
}
deserializer.deserialize_any(Visitor)
}
}
impl FromStr for Dependency {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self> {
Ok(Self::Package(s.parse()?))
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct RegistryPackage {
pub name: Option<registry::PackageName>,
pub version: VersionReq,
pub registry: Option<String>,
}
impl FromStr for RegistryPackage {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self> {
Ok(Self {
name: None,
version: s
.parse()
.with_context(|| format!("'{s}' is an invalid registry package version"))?,
registry: None,
})
}
}
#[derive(Clone, Debug)]
pub struct RegistryResolution {
pub name: registry::PackageName,
pub package: registry::PackageName,
pub registry: Option<String>,
pub requirement: VersionReq,
pub version: Version,
pub digest: AnyHash,
pub path: PathBuf,
}
#[derive(Clone, Debug)]
pub struct LocalResolution {
pub name: registry::PackageName,
pub path: PathBuf,
}
#[derive(Debug, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum DependencyResolution {
Registry(RegistryResolution),
Local(LocalResolution),
}
impl DependencyResolution {
pub fn name(&self) -> ®istry::PackageName {
match self {
Self::Registry(res) => &res.name,
Self::Local(res) => &res.name,
}
}
pub fn path(&self) -> &Path {
match self {
Self::Registry(res) => &res.path,
Self::Local(res) => &res.path,
}
}
pub fn version(&self) -> Option<&Version> {
match self {
Self::Registry(res) => Some(&res.version),
Self::Local(_) => None,
}
}
pub fn key(&self) -> Option<(®istry::PackageName, Option<&str>)> {
match self {
DependencyResolution::Registry(pkg) => Some((&pkg.package, pkg.registry.as_deref())),
DependencyResolution::Local(_) => None,
}
}
pub fn decode(&self) -> Result<DecodedDependency> {
if self.path().is_dir() {
return Ok(DecodedDependency::Wit {
resolution: self,
package: UnresolvedPackage::parse_dir(self.path()).with_context(|| {
format!(
"failed to parse dependency `{path}`",
path = self.path().display()
)
})?,
});
}
let bytes = fs::read(self.path()).with_context(|| {
format!(
"failed to read content of dependency `{name}` at path `{path}`",
name = self.name(),
path = self.path().display()
)
})?;
if &bytes[0..4] != b"\0asm" {
return Ok(DecodedDependency::Wit {
resolution: self,
package: UnresolvedPackage::parse(
self.path(),
std::str::from_utf8(&bytes).with_context(|| {
format!(
"dependency `{path}` is not UTF-8 encoded",
path = self.path().display()
)
})?,
)?,
});
}
Ok(DecodedDependency::Wasm {
resolution: self,
decoded: wit_component::decode(&bytes).with_context(|| {
format!(
"failed to decode content of dependency `{name}` at path `{path}`",
name = self.name(),
path = self.path().display()
)
})?,
})
}
}
pub enum DecodedDependency<'a> {
Wit {
resolution: &'a DependencyResolution,
package: UnresolvedPackage,
},
Wasm {
resolution: &'a DependencyResolution,
decoded: DecodedWasm,
},
}
impl<'a> DecodedDependency<'a> {
pub fn resolve(self) -> Result<(Resolve, PackageId, Vec<PathBuf>)> {
match self {
Self::Wit { package, .. } => {
let mut resolve = Resolve::new();
let source_files = package.source_files().map(Path::to_path_buf).collect();
let pkg = resolve.push(package)?;
Ok((resolve, pkg, source_files))
}
Self::Wasm { decoded, .. } => match decoded {
DecodedWasm::WitPackage(resolve, pkg) => Ok((resolve, pkg, Vec::new())),
DecodedWasm::Component(resolve, world) => {
let pkg = resolve.worlds[world].package.unwrap();
Ok((resolve, pkg, Vec::new()))
}
},
}
}
pub fn package_name(&self) -> &PackageName {
match self {
Self::Wit { package, .. } => &package.name,
Self::Wasm { decoded, .. } => &decoded.resolve().packages[decoded.package()].name,
}
}
pub fn into_component_world(self) -> Result<(Resolve, WorldId)> {
match self {
Self::Wasm {
decoded: DecodedWasm::Component(resolve, world),
..
} => Ok((resolve, world)),
_ => bail!("dependency is not a WebAssembly component"),
}
}
}
pub struct DependencyResolver<'a> {
terminal: &'a Terminal,
registry_urls: &'a HashMap<String, Url>,
warg_config: &'a Config,
lock_file: Option<LockFileResolver<'a>>,
registries: IndexMap<&'a str, Registry<'a>>,
resolutions: HashMap<registry::PackageName, DependencyResolution>,
network_allowed: bool,
}
impl<'a> DependencyResolver<'a> {
pub fn new(
warg_config: &'a Config,
registry_urls: &'a HashMap<String, Url>,
lock_file: Option<LockFileResolver<'a>>,
terminal: &'a Terminal,
network_allowed: bool,
) -> Result<Self> {
Ok(DependencyResolver {
terminal,
registry_urls,
warg_config,
lock_file,
registries: Default::default(),
resolutions: Default::default(),
network_allowed,
})
}
pub async fn add_dependency(
&mut self,
name: &'a registry::PackageName,
dependency: &'a Dependency,
) -> Result<()> {
match dependency {
Dependency::Package(package) => {
let registry_name = package.registry.as_deref().unwrap_or(DEFAULT_REGISTRY_NAME);
let package_name = package.name.clone().unwrap_or_else(|| name.clone());
let locked = match self.lock_file.as_ref().and_then(|resolver| {
resolver
.resolve(registry_name, &package_name, &package.version)
.transpose()
}) {
Some(Ok(locked)) => Some(locked),
Some(Err(e)) => return Err(e),
_ => None,
};
let registry = match self.registries.entry(registry_name) {
indexmap::map::Entry::Occupied(e) => e.into_mut(),
indexmap::map::Entry::Vacant(e) => {
let url = find_url(
Some(registry_name),
self.registry_urls,
self.warg_config.home_url.as_deref(),
)?;
e.insert(Registry {
client: Arc::new(
create_client(self.warg_config, url, self.terminal).await?,
),
packages: HashMap::new(),
dependencies: Vec::new(),
upserts: HashSet::new(),
})
}
};
registry
.add_dependency(name, package_name, &package.version, registry_name, locked)
.await?;
}
Dependency::Local(p) => {
let res = DependencyResolution::Local(LocalResolution {
name: name.clone(),
path: p.clone(),
});
let prev = self.resolutions.insert(name.clone(), res);
assert!(prev.is_none());
}
}
Ok(())
}
pub async fn resolve(self) -> Result<DependencyResolutionMap> {
let Self {
mut registries,
mut resolutions,
terminal,
network_allowed,
..
} = self;
let downloads = Self::update_packages(&mut registries, terminal, network_allowed).await?;
for resolution in
Self::download_and_resolve(registries, downloads, terminal, network_allowed).await?
{
let prev = resolutions.insert(resolution.name().clone(), resolution);
assert!(prev.is_none());
}
Ok(resolutions)
}
async fn update_packages(
registries: &mut IndexMap<&'a str, Registry<'a>>,
terminal: &Terminal,
network_allowed: bool,
) -> Result<DownloadMap<'a>> {
let task_count = registries
.iter()
.filter(|(_, r)| !r.upserts.is_empty())
.count();
let mut progress = ProgressBar::with_style("Updating", ProgressStyle::Ratio, terminal);
if task_count > 0 {
if !network_allowed {
bail!("a component registry update is required but network access is disabled");
}
terminal.status("Updating", "component registry package logs")?;
progress.tick_now(0, task_count, "")?;
}
let mut downloads = DownloadMap::new();
let mut futures = FuturesUnordered::new();
for (index, (name, registry)) in registries.iter_mut().enumerate() {
let upserts = std::mem::take(&mut registry.upserts);
if upserts.is_empty() {
registry.add_downloads(name, &mut downloads).await?;
continue;
}
log::info!("updating package logs for registry `{name}`");
let client = registry.client.clone();
futures.push(tokio::spawn(async move {
(index, client.fetch_packages(upserts.iter()).await)
}))
}
assert_eq!(futures.len(), task_count);
let mut finished = 0;
while let Some(res) = futures.next().await {
let (index, res) = res.context("failed to join registry update task")?;
let (name, registry) = registries
.get_index_mut(index)
.expect("out of bounds registry index");
res.with_context(|| {
format!("failed to update package logs for component registry `{name}`")
})?;
log::info!("package logs successfully updated for component registry `{name}`");
finished += 1;
progress.tick_now(finished, task_count, ": updated `{name}`")?;
registry.add_downloads(name, &mut downloads).await?;
}
assert_eq!(finished, task_count);
progress.clear();
Ok(downloads)
}
async fn download_and_resolve(
mut registries: IndexMap<&'a str, Registry<'a>>,
downloads: DownloadMap<'a>,
terminal: &Terminal,
network_allowed: bool,
) -> Result<impl Iterator<Item = DependencyResolution> + 'a> {
if !downloads.is_empty() {
if !network_allowed {
bail!("a component package download is required but network access is disabled");
}
terminal.status("Downloading", "component registry packages")?;
let mut progress =
ProgressBar::with_style("Downloading", ProgressStyle::Ratio, terminal);
let count = downloads.len();
progress.tick_now(0, count, "")?;
let mut futures = FuturesUnordered::new();
for ((registry_name, name, version), deps) in downloads {
let registry_index = registries.get_index_of(registry_name).unwrap();
let (_, registry) = registries.get_index(registry_index).unwrap();
log::info!("downloading content for package `{name}` from component registry `{registry_name}`");
let client = registry.client.clone();
futures.push(tokio::spawn(async move {
let res = client.download_exact(&name, &version).await;
(registry_index, name, version, deps, res)
}))
}
assert_eq!(futures.len(), count);
let mut finished = 0;
while let Some(res) = futures.next().await {
let (registry_index, name, version, deps, res) =
res.context("failed to join content download task")?;
let (registry_name, registry) = registries
.get_index_mut(registry_index)
.expect("out of bounds registry index");
let download = res.with_context(|| {
format!("failed to download package `{name}` (v{version}) from component registry `{registry_name}`")
})?;
log::info!(
"downloaded contents of package `{name}` (v{version}) from component registry `{registry_name}`"
);
finished += 1;
progress.tick_now(
finished,
count,
&format!(": downloaded `{name}` (v{version})"),
)?;
for index in deps {
let dependency = &mut registry.dependencies[index];
assert!(dependency.resolution.is_none());
dependency.resolution = Some(RegistryResolution {
name: dependency.name.clone(),
package: dependency.package.clone(),
registry: if *registry_name == DEFAULT_REGISTRY_NAME {
None
} else {
Some(registry_name.to_string())
},
requirement: dependency.version.clone(),
version: download.version.clone(),
digest: download.digest.clone(),
path: download.path.clone(),
});
}
}
assert_eq!(finished, count);
progress.clear();
}
Ok(registries
.into_values()
.flat_map(|r| r.dependencies.into_iter())
.map(|d| {
DependencyResolution::Registry(
d.resolution.expect("dependency should have been resolved"),
)
}))
}
}
struct Registry<'a> {
client: Arc<FileSystemClient>,
packages: HashMap<registry::PackageName, PackageInfo>,
dependencies: Vec<RegistryDependency<'a>>,
upserts: HashSet<registry::PackageName>,
}
impl<'a> Registry<'a> {
async fn add_dependency(
&mut self,
name: &'a registry::PackageName,
package: registry::PackageName,
version: &'a VersionReq,
registry: &str,
locked: Option<&LockedPackageVersion>,
) -> Result<()> {
let dep = RegistryDependency {
name,
package: package.clone(),
version,
locked: locked.map(|l| (l.version.clone(), l.digest.clone())),
resolution: None,
};
self.dependencies.push(dep);
let mut needs_upsert = true;
if let Some(locked) = locked {
if let Some(package) =
Self::load_package(&self.client, &mut self.packages, package.clone()).await?
{
if package
.state
.release(&locked.version)
.and_then(|r| r.content())
.is_some()
{
needs_upsert = false;
}
}
}
if needs_upsert && self.upserts.insert(package.clone()) {
log::info!(
"package `{package}` from component registry `{registry}` needs to be updated"
);
}
Ok(())
}
async fn add_downloads(
&mut self,
registry: &'a str,
downloads: &mut DownloadMap<'a>,
) -> Result<()> {
let Self {
dependencies,
packages,
client,
..
} = self;
for (index, dependency) in dependencies.iter_mut().enumerate() {
let package = Self::load_package(client, packages, dependency.package.clone())
.await?
.with_context(|| {
format!(
"package `{name}` was not found in component registry `{registry}`",
name = dependency.package
)
})?;
let release = match &dependency.locked {
Some((version, digest)) => {
let exact_req = VersionReq {
comparators: vec![Comparator {
op: Op::Exact,
major: version.major,
minor: Some(version.minor),
patch: Some(version.patch),
pre: version.pre.clone(),
}],
};
package.state.find_latest_release(&exact_req).map(|r| {
let content = r.content().expect("release must have content");
if content != digest {
bail!(
"component registry package `{name}` (v`{version}`) has digest `{content}` but the lock file specifies digest `{digest}`",
name = dependency.package,
);
}
Ok(r)
}).transpose()?.or_else(|| package.state.find_latest_release(dependency.version))
}
None => package.state.find_latest_release(dependency.version),
}.with_context(|| format!("component registry package `{name}` has no release matching version requirement `{version}`", name = dependency.package, version = dependency.version))?;
let digest = release.content().expect("release must have content");
match client.content().content_location(digest) {
Some(path) => {
assert!(dependency.resolution.is_none());
dependency.resolution = Some(RegistryResolution {
name: dependency.name.clone(),
package: dependency.package.clone(),
registry: if registry == DEFAULT_REGISTRY_NAME {
None
} else {
Some(registry.to_string())
},
requirement: dependency.version.clone(),
version: release.version.clone(),
digest: digest.clone(),
path,
});
log::info!(
"version {version} of registry package `{name}` from registry `{registry}` is already in client storage",
name = dependency.package,
version = release.version,
);
}
None => {
let indexes = downloads
.entry((
registry,
dependency.package.clone(),
release.version.clone(),
))
.or_default();
if indexes.is_empty() {
log::info!(
"version {version} of registry package `{name}` from registry `{registry}` needs to be downloaded",
name = dependency.package,
version = release.version,
);
}
indexes.push(index);
}
}
}
Ok(())
}
async fn load_package<'b>(
client: &FileSystemClient,
packages: &'b mut HashMap<registry::PackageName, PackageInfo>,
name: registry::PackageName,
) -> Result<Option<&'b PackageInfo>> {
match packages.entry(name) {
hash_map::Entry::Occupied(e) => Ok(Some(e.into_mut())),
hash_map::Entry::Vacant(e) => match client.package(e.key()).await {
Ok(p) => Ok(Some(e.insert(p))),
Err(warg_client::ClientError::PackageDoesNotExist { .. }) => Ok(None),
Err(err) => Err(err.into()),
},
}
}
}
type DownloadMapKey<'a> = (&'a str, registry::PackageName, Version);
type DownloadMap<'a> = HashMap<DownloadMapKey<'a>, Vec<usize>>;
struct RegistryDependency<'a> {
name: &'a registry::PackageName,
package: registry::PackageName,
version: &'a VersionReq,
locked: Option<(Version, AnyHash)>,
resolution: Option<RegistryResolution>,
}
pub type DependencyResolutionMap = HashMap<registry::PackageName, DependencyResolution>;