1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
use std::sync::Arc;
use anyhow::Result;
use log::trace;
use tokio::task::JoinHandle;
use wasmtime::{ResourceLimiter, Val};
use crate::env::Environment;
use crate::runtimes::wasmtime::{WasmtimeCompiledModule, WasmtimeRuntime};
use crate::state::ProcessState;
use crate::{Process, Signal, WasmProcess};
/// Spawns a new wasm process from a compiled module.
///
/// A `Process` is created from a `module`, entry `function`, array of arguments and config. The
/// configuration will define some characteristics of the process, such as maximum memory, fuel
/// and host function properties (filesystem access, networking, ..).
///
/// After it's spawned the process will keep running in the background. A process can be killed
/// with `Signal::Kill` signal. If you would like to block until the process is finished you can
/// `.await` on the returned `JoinHandle<()>`.
pub async fn spawn_wasm<S>(
env: Arc<dyn Environment>,
runtime: WasmtimeRuntime,
module: &WasmtimeCompiledModule<S>,
state: S,
function: &str,
params: Vec<Val>,
link: Option<(Option<i64>, Arc<dyn Process>)>,
) -> Result<(JoinHandle<Result<S>>, Arc<dyn Process>)>
where
S: ProcessState + Send + Sync + ResourceLimiter + 'static,
{
let id = state.id();
trace!("Spawning process: {}", id);
let signal_mailbox = state.signal_mailbox().clone();
let message_mailbox = state.message_mailbox().clone();
let instance = runtime.instantiate(module, state).await?;
let function = function.to_string();
let fut = async move { instance.call(&function, params).await };
let child_process = crate::new(fut, id, env.clone(), signal_mailbox.1, message_mailbox);
let child_process_handle = Arc::new(WasmProcess::new(id, signal_mailbox.0.clone()));
env.add_process(id, child_process_handle.clone());
// **Child link guarantees**:
// The link signal is going to be put inside of the child's mailbox and is going to be
// processed before any child code can run. This means that any failure inside the child
// Wasm code will be correctly reported to the parent.
//
// We assume here that the code inside of `process::new()` will not fail during signal
// handling.
//
// **Parent link guarantees**:
// A `tokio::task::yield_now()` call is executed to allow the parent to link the child
// before continuing any further execution. This should force the parent to process all
// signals right away.
//
// The parent could have received a `kill` signal in its mailbox before this function was
// called and this signal is going to be processed before the link is established (FIFO).
// Only after the yield function we can guarantee that the child is going to be notified
// if the parent fails. This is ok, as the actual spawning of the child happens after the
// call, so the child wouldn't even exist if the parent failed before.
//
// TODO: The guarantees provided here don't hold anymore in a distributed environment and
// will require some rethinking. This function will be executed on a completely
// different computer and needs to be synced in a more robust way with the parent
// running somewhere else.
if let Some((tag, process)) = link {
// Send signal to itself to perform the linking
process.send(Signal::Link(None, child_process_handle.clone()));
// Suspend itself to process all new signals
tokio::task::yield_now().await;
// Send signal to child to link it
signal_mailbox
.0
.send(Signal::Link(tag, process))
.expect("receiver must exist at this point");
}
// Spawn a background process
trace!("Process size: {}", std::mem::size_of_val(&child_process));
let join = tokio::task::spawn(child_process);
Ok((join, child_process_handle))
}