use std::{
collections::{BTreeMap, HashMap, HashSet},
fmt::Debug,
path::{Path, PathBuf},
sync::Arc,
};
use anyhow::{Context, Error};
use futures::{future::BoxFuture, StreamExt, TryStreamExt};
use once_cell::sync::OnceCell;
use petgraph::visit::EdgeRef;
use virtual_fs::{FileSystem, OverlayFileSystem, UnionFileSystem, WebcVolumeFileSystem};
use wasmer_config::package::PackageId;
use webc::metadata::annotations::Atom as AtomAnnotation;
use webc::{Container, Volume};
use crate::{
bin_factory::{BinaryPackage, BinaryPackageCommand},
runtime::{
package_loader::PackageLoader,
resolver::{
DependencyGraph, ItemLocation, PackageSummary, Resolution, ResolvedFileSystemMapping,
ResolvedPackage,
},
},
};
use super::to_module_hash;
const MAX_PARALLEL_DOWNLOADS: usize = 32;
#[tracing::instrument(level = "debug", skip_all)]
pub async fn load_package_tree(
root: &Container,
loader: &dyn PackageLoader,
resolution: &Resolution,
root_is_local_dir: bool,
) -> Result<BinaryPackage, Error> {
let mut containers = fetch_dependencies(loader, &resolution.package, &resolution.graph).await?;
containers.insert(resolution.package.root_package.clone(), root.clone());
let package_ids = containers.keys().cloned().collect();
let fs = filesystem(&containers, &resolution.package, root_is_local_dir)?;
let root = &resolution.package.root_package;
let commands: Vec<BinaryPackageCommand> =
commands(&resolution.package.commands, &containers, resolution)?;
let file_system_memory_footprint = count_file_system(&fs, Path::new("/"));
let loaded = BinaryPackage {
id: root.clone(),
package_ids,
when_cached: crate::syscalls::platform_clock_time_get(
wasmer_wasix_types::wasi::Snapshot0Clockid::Monotonic,
1_000_000,
)
.ok()
.map(|ts| ts as u128),
hash: OnceCell::new(),
entrypoint_cmd: resolution.package.entrypoint.clone(),
webc_fs: Arc::new(fs),
commands,
uses: Vec::new(),
file_system_memory_footprint,
additional_host_mapped_directories: vec![],
};
Ok(loaded)
}
fn commands(
commands: &BTreeMap<String, ItemLocation>,
containers: &HashMap<PackageId, Container>,
resolution: &Resolution,
) -> Result<Vec<BinaryPackageCommand>, Error> {
let mut pkg_commands = Vec::new();
for (
name,
ItemLocation {
name: original_name,
package,
},
) in commands
{
let webc = &containers[package];
let manifest = webc.manifest();
let command_metadata = &manifest.commands[original_name];
if let Some(cmd) =
load_binary_command(package, name, command_metadata, containers, resolution)?
{
pkg_commands.push(cmd);
}
}
Ok(pkg_commands)
}
#[tracing::instrument(skip_all, fields(%package_id, %command_name))]
fn load_binary_command(
package_id: &PackageId,
command_name: &str,
cmd: &webc::metadata::Command,
containers: &HashMap<PackageId, Container>,
resolution: &Resolution,
) -> Result<Option<BinaryPackageCommand>, anyhow::Error> {
let AtomAnnotation {
name: atom_name,
dependency,
..
} = match atom_name_for_command(command_name, cmd)? {
Some(name) => name,
None => {
tracing::warn!(
cmd.name=command_name,
cmd.runner=%cmd.runner,
"Skipping unsupported command",
);
return Ok(None);
}
};
let package = &containers[package_id];
let (webc, resolved_package_id) = match dependency {
Some(dep) => {
let ix = resolution
.graph
.packages()
.get(package_id)
.copied()
.unwrap();
let graph = resolution.graph.graph();
let edge_reference = graph
.edges_directed(ix, petgraph::Direction::Outgoing)
.find(|edge| edge.weight().alias == dep)
.with_context(|| format!("Unable to find the \"{dep}\" dependency for the \"{command_name}\" command in \"{package_id}\""))?;
let other_package = graph.node_weight(edge_reference.target()).unwrap();
let id = &other_package.id;
tracing::debug!(
dependency=%dep,
resolved_package_id=%id,
"command atom resolution: resolved dependency",
);
(&containers[id], id)
}
None => (package, package_id),
};
let atom = webc.get_atom(&atom_name);
if atom.is_none() && cmd.annotations.is_empty() {
tracing::info!("applying legacy atom hack");
return legacy_atom_hack(webc, command_name, cmd);
}
let hash = to_module_hash(webc.manifest().atom_signature(&atom_name)?);
let atom = atom.with_context(|| {
let available_atoms = webc.atoms().keys().map(|x| x.as_str()).collect::<Vec<_>>().join(",");
tracing::warn!(
%atom_name,
%resolved_package_id,
%available_atoms,
"invalid command: could not find atom in package",
);
format!(
"The '{command_name}' command uses the '{atom_name}' atom, but it isn't present in the package: {resolved_package_id})"
)
})?;
let cmd = BinaryPackageCommand::new(command_name.to_string(), cmd.clone(), atom, hash);
Ok(Some(cmd))
}
fn atom_name_for_command(
command_name: &str,
cmd: &webc::metadata::Command,
) -> Result<Option<AtomAnnotation>, anyhow::Error> {
use webc::metadata::annotations::{WASI_RUNNER_URI, WCGI_RUNNER_URI};
if let Some(atom) = cmd
.atom()
.context("Unable to deserialize atom annotations")?
{
return Ok(Some(atom));
}
if [WASI_RUNNER_URI, WCGI_RUNNER_URI]
.iter()
.any(|uri| cmd.runner.starts_with(uri))
{
tracing::debug!(
command = command_name,
"No annotations specifying the atom name found. Falling back to the command name"
);
return Ok(Some(AtomAnnotation::new(command_name, None)));
}
Ok(None)
}
fn legacy_atom_hack(
webc: &Container,
command_name: &str,
metadata: &webc::metadata::Command,
) -> Result<Option<BinaryPackageCommand>, anyhow::Error> {
let (name, atom) = webc
.atoms()
.into_iter()
.next()
.ok_or_else(|| anyhow::Error::msg("container does not have any atom"))?;
tracing::debug!(
command_name,
atom.name = name.as_str(),
atom.len = atom.len(),
"(hack) The command metadata is malformed. Falling back to the first atom in the WEBC file",
);
let hash = to_module_hash(webc.manifest().atom_signature(&name)?);
Ok(Some(BinaryPackageCommand::new(
command_name.to_string(),
metadata.clone(),
atom,
hash,
)))
}
async fn fetch_dependencies(
loader: &dyn PackageLoader,
pkg: &ResolvedPackage,
graph: &DependencyGraph,
) -> Result<HashMap<PackageId, Container>, Error> {
let mut packages = HashSet::new();
for loc in pkg.commands.values() {
packages.insert(loc.package.clone());
}
for mapping in &pkg.filesystem {
packages.insert(mapping.package.clone());
}
packages.remove(&pkg.root_package);
let packages = packages.into_iter().filter_map(|id| {
let crate::runtime::resolver::Node { pkg, dist, .. } = &graph[&id];
let summary = PackageSummary {
pkg: pkg.clone(),
dist: dist.clone()?,
};
Some((id, summary))
});
let packages: HashMap<PackageId, Container> = futures::stream::iter(packages)
.map(|(id, s)| async move {
match loader.load(&s).await {
Ok(webc) => Ok((id, webc)),
Err(e) => Err(e),
}
})
.buffer_unordered(MAX_PARALLEL_DOWNLOADS)
.try_collect()
.await?;
Ok(packages)
}
fn count_file_system(fs: &dyn FileSystem, path: &Path) -> u64 {
let mut total = 0;
let dir = match fs.read_dir(path) {
Ok(d) => d,
Err(_err) => {
return 0;
}
};
for entry in dir.flatten() {
if let Ok(meta) = entry.metadata() {
total += meta.len();
if meta.is_dir() {
total += count_file_system(fs, entry.path.as_path());
}
}
}
total
}
fn filesystem(
packages: &HashMap<PackageId, Container>,
pkg: &ResolvedPackage,
root_is_local_dir: bool,
) -> Result<Box<dyn FileSystem + Send + Sync>, Error> {
if pkg.filesystem.is_empty() {
return Ok(Box::new(OverlayFileSystem::<
virtual_fs::EmptyFileSystem,
Vec<WebcVolumeFileSystem>,
>::new(
virtual_fs::EmptyFileSystem::default(), vec![]
)));
}
let mut found_v2 = false;
let mut found_v3 = false;
for ResolvedFileSystemMapping { package, .. } in &pkg.filesystem {
let container = packages.get(package).with_context(|| {
format!(
"\"{}\" wants to use the \"{}\" package, but it isn't in the dependency tree",
pkg.root_package, package,
)
})?;
found_v2 |= container.version() == webc::Version::V2;
found_v3 |= container.version() == webc::Version::V3;
}
if found_v3 && !found_v2 {
filesystem_v3(packages, pkg, root_is_local_dir)
} else {
filesystem_v2(packages, pkg, root_is_local_dir)
}
}
fn filesystem_v3(
packages: &HashMap<PackageId, Container>,
pkg: &ResolvedPackage,
root_is_local_dir: bool,
) -> Result<Box<dyn FileSystem + Send + Sync>, Error> {
let mut volumes: HashMap<&PackageId, BTreeMap<String, Volume>> = HashMap::new();
let mut mountings: Vec<_> = pkg.filesystem.iter().collect();
mountings.sort_by_key(|m| std::cmp::Reverse(m.mount_path.as_path()));
let union_fs = UnionFileSystem::new();
for ResolvedFileSystemMapping {
mount_path,
volume_name,
package,
..
} in &pkg.filesystem
{
if *package == pkg.root_package && root_is_local_dir {
continue;
}
let container = packages.get(package).with_context(|| {
format!(
"\"{}\" wants to use the \"{}\" package, but it isn't in the dependency tree",
pkg.root_package, package,
)
})?;
let container_volumes = match volumes.entry(package) {
std::collections::hash_map::Entry::Occupied(entry) => &*entry.into_mut(),
std::collections::hash_map::Entry::Vacant(entry) => &*entry.insert(container.volumes()),
};
let volume = container_volumes.get(volume_name).with_context(|| {
format!("The \"{package}\" package doesn't have a \"{volume_name}\" volume")
})?;
let webc_vol = WebcVolumeFileSystem::new(volume.clone());
union_fs.mount(volume_name.clone(), mount_path, Box::new(webc_vol))?;
}
let fs = OverlayFileSystem::new(virtual_fs::EmptyFileSystem::default(), [union_fs]);
Ok(Box::new(fs))
}
fn filesystem_v2(
packages: &HashMap<PackageId, Container>,
pkg: &ResolvedPackage,
root_is_local_dir: bool,
) -> Result<Box<dyn FileSystem + Send + Sync>, Error> {
let mut filesystems = Vec::new();
let mut volumes: HashMap<&PackageId, BTreeMap<String, Volume>> = HashMap::new();
let mut mountings: Vec<_> = pkg.filesystem.iter().collect();
mountings.sort_by_key(|m| std::cmp::Reverse(m.mount_path.as_path()));
for ResolvedFileSystemMapping {
mount_path,
volume_name,
package,
original_path,
} in &pkg.filesystem
{
if *package == pkg.root_package && root_is_local_dir {
continue;
}
let container_volumes = match volumes.entry(package) {
std::collections::hash_map::Entry::Occupied(entry) => &*entry.into_mut(),
std::collections::hash_map::Entry::Vacant(entry) => {
let container = packages.get(package)
.with_context(|| format!(
"\"{}\" wants to use the \"{}\" package, but it isn't in the dependency tree",
pkg.root_package,
package,
))?;
&*entry.insert(container.volumes())
}
};
let volume = container_volumes.get(volume_name).with_context(|| {
format!("The \"{package}\" package doesn't have a \"{volume_name}\" volume")
})?;
let mount_path = mount_path.clone();
let fs = if let Some(original) = original_path {
let original = PathBuf::from(original);
MappedPathFileSystem::new(
WebcVolumeFileSystem::new(volume.clone()),
Box::new(move |path: &Path| {
let without_mount_dir = path
.strip_prefix(&mount_path)
.map_err(|_| virtual_fs::FsError::BaseNotDirectory)?;
Ok(original.join(without_mount_dir))
}) as DynPathMapper,
)
} else {
MappedPathFileSystem::new(
WebcVolumeFileSystem::new(volume.clone()),
Box::new(move |path: &Path| {
let without_mount_dir = path
.strip_prefix(&mount_path)
.map_err(|_| virtual_fs::FsError::BaseNotDirectory)?;
Ok(without_mount_dir.to_owned())
}) as DynPathMapper,
)
};
filesystems.push(fs);
}
let fs = OverlayFileSystem::new(virtual_fs::EmptyFileSystem::default(), filesystems);
Ok(Box::new(fs))
}
type DynPathMapper = Box<dyn Fn(&Path) -> Result<PathBuf, virtual_fs::FsError> + Send + Sync>;
#[derive(Clone, PartialEq)]
struct MappedPathFileSystem<F, M> {
inner: F,
map: M,
}
impl<F, M> MappedPathFileSystem<F, M>
where
M: Fn(&Path) -> Result<PathBuf, virtual_fs::FsError> + Send + Sync + 'static,
{
fn new(inner: F, map: M) -> Self {
MappedPathFileSystem { inner, map }
}
fn path(&self, path: &Path) -> Result<PathBuf, virtual_fs::FsError> {
let path = (self.map)(path)?;
Ok(Path::new("/").join(path))
}
}
impl<M, F> FileSystem for MappedPathFileSystem<F, M>
where
F: FileSystem,
M: Fn(&Path) -> Result<PathBuf, virtual_fs::FsError> + Send + Sync + 'static,
{
fn readlink(&self, path: &Path) -> virtual_fs::Result<PathBuf> {
let path = self.path(path)?;
self.inner.readlink(&path)
}
fn read_dir(&self, path: &Path) -> virtual_fs::Result<virtual_fs::ReadDir> {
let path = self.path(path)?;
self.inner.read_dir(&path)
}
fn create_dir(&self, path: &Path) -> virtual_fs::Result<()> {
let path = self.path(path)?;
self.inner.create_dir(&path)
}
fn remove_dir(&self, path: &Path) -> virtual_fs::Result<()> {
let path = self.path(path)?;
self.inner.remove_dir(&path)
}
fn rename<'a>(&'a self, from: &Path, to: &Path) -> BoxFuture<'a, virtual_fs::Result<()>> {
let from = from.to_owned();
let to = to.to_owned();
Box::pin(async move {
let from = self.path(&from)?;
let to = self.path(&to)?;
self.inner.rename(&from, &to).await
})
}
fn metadata(&self, path: &Path) -> virtual_fs::Result<virtual_fs::Metadata> {
let path = self.path(path)?;
self.inner.metadata(&path)
}
fn symlink_metadata(&self, path: &Path) -> virtual_fs::Result<virtual_fs::Metadata> {
let path = self.path(path)?;
self.inner.symlink_metadata(&path)
}
fn remove_file(&self, path: &Path) -> virtual_fs::Result<()> {
let path = self.path(path)?;
self.inner.remove_file(&path)
}
fn new_open_options(&self) -> virtual_fs::OpenOptions {
virtual_fs::OpenOptions::new(self)
}
fn mount(
&self,
name: String,
path: &Path,
fs: Box<dyn FileSystem + Send + Sync>,
) -> virtual_fs::Result<()> {
let path = self.path(path)?;
self.inner.mount(name, path.as_path(), fs)
}
}
impl<F, M> virtual_fs::FileOpener for MappedPathFileSystem<F, M>
where
F: FileSystem,
M: Fn(&Path) -> Result<PathBuf, virtual_fs::FsError> + Send + Sync + 'static,
{
fn open(
&self,
path: &Path,
conf: &virtual_fs::OpenOptionsConfig,
) -> virtual_fs::Result<Box<dyn virtual_fs::VirtualFile + Send + Sync + 'static>> {
let path = self.path(path)?;
self.inner
.new_open_options()
.options(conf.clone())
.open(path)
}
}
impl<F, M> Debug for MappedPathFileSystem<F, M>
where
F: Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MappedPathFileSystem")
.field("inner", &self.inner)
.field("map", &std::any::type_name::<M>())
.finish()
}
}