wasmtime_wasi/
filesystem.rs

1use crate::bindings::filesystem::types;
2use crate::runtime::{spawn_blocking, AbortOnDropJoinHandle};
3use crate::{InputStream, OutputStream, Pollable, StreamError, StreamResult, TrappableError};
4use anyhow::anyhow;
5use bytes::{Bytes, BytesMut};
6use std::io;
7use std::mem;
8use std::sync::Arc;
9
10pub type FsResult<T> = Result<T, FsError>;
11
12pub type FsError = TrappableError<types::ErrorCode>;
13
14impl From<wasmtime::component::ResourceTableError> for FsError {
15    fn from(error: wasmtime::component::ResourceTableError) -> Self {
16        Self::trap(error)
17    }
18}
19
20impl From<io::Error> for FsError {
21    fn from(error: io::Error) -> Self {
22        types::ErrorCode::from(error).into()
23    }
24}
25
26pub enum Descriptor {
27    File(File),
28    Dir(Dir),
29}
30
31impl Descriptor {
32    pub fn file(&self) -> Result<&File, types::ErrorCode> {
33        match self {
34            Descriptor::File(f) => Ok(f),
35            Descriptor::Dir(_) => Err(types::ErrorCode::BadDescriptor),
36        }
37    }
38
39    pub fn dir(&self) -> Result<&Dir, types::ErrorCode> {
40        match self {
41            Descriptor::Dir(d) => Ok(d),
42            Descriptor::File(_) => Err(types::ErrorCode::NotDirectory),
43        }
44    }
45
46    pub fn is_file(&self) -> bool {
47        match self {
48            Descriptor::File(_) => true,
49            Descriptor::Dir(_) => false,
50        }
51    }
52
53    pub fn is_dir(&self) -> bool {
54        match self {
55            Descriptor::File(_) => false,
56            Descriptor::Dir(_) => true,
57        }
58    }
59}
60
61bitflags::bitflags! {
62    #[derive(Copy, Clone, Debug, PartialEq, Eq)]
63    pub struct FilePerms: usize {
64        const READ = 0b1;
65        const WRITE = 0b10;
66    }
67}
68
69bitflags::bitflags! {
70    #[derive(Copy, Clone, Debug, PartialEq, Eq)]
71    pub struct OpenMode: usize {
72        const READ = 0b1;
73        const WRITE = 0b10;
74    }
75}
76
77#[derive(Clone)]
78pub struct File {
79    /// The operating system File this struct is mediating access to.
80    ///
81    /// Wrapped in an Arc because the same underlying file is used for
82    /// implementing the stream types. A copy is also needed for
83    /// [`spawn_blocking`].
84    ///
85    /// [`spawn_blocking`]: Self::spawn_blocking
86    pub file: Arc<cap_std::fs::File>,
87    /// Permissions to enforce on access to the file. These permissions are
88    /// specified by a user of the `crate::WasiCtxBuilder`, and are
89    /// enforced prior to any enforced by the underlying operating system.
90    pub perms: FilePerms,
91    /// The mode the file was opened under: bits for reading, and writing.
92    /// Required to correctly report the DescriptorFlags, because cap-std
93    /// doesn't presently provide a cross-platform equivalent of reading the
94    /// oflags back out using fcntl.
95    pub open_mode: OpenMode,
96
97    allow_blocking_current_thread: bool,
98}
99
100impl File {
101    pub fn new(
102        file: cap_std::fs::File,
103        perms: FilePerms,
104        open_mode: OpenMode,
105        allow_blocking_current_thread: bool,
106    ) -> Self {
107        Self {
108            file: Arc::new(file),
109            perms,
110            open_mode,
111            allow_blocking_current_thread,
112        }
113    }
114
115    /// Execute the blocking `body` function.
116    ///
117    /// Depending on how the WasiCtx was configured, the body may either be:
118    /// - Executed directly on the current thread. In this case the `async`
119    ///   signature of this method is effectively a lie and the returned
120    ///   Future will always be immediately Ready. Or:
121    /// - Spawned on a background thread using [`tokio::task::spawn_blocking`]
122    ///   and immediately awaited.
123    ///
124    /// Intentionally blocking the executor thread might seem unorthodox, but is
125    /// not actually a problem for specific workloads. See:
126    /// - [`crate::WasiCtxBuilder::allow_blocking_current_thread`]
127    /// - [Poor performance of wasmtime file I/O maybe because tokio](https://github.com/bytecodealliance/wasmtime/issues/7973)
128    /// - [Implement opt-in for enabling WASI to block the current thread](https://github.com/bytecodealliance/wasmtime/pull/8190)
129    pub(crate) async fn run_blocking<F, R>(&self, body: F) -> R
130    where
131        F: FnOnce(&cap_std::fs::File) -> R + Send + 'static,
132        R: Send + 'static,
133    {
134        match self.as_blocking_file() {
135            Some(file) => body(file),
136            None => self.spawn_blocking(body).await,
137        }
138    }
139
140    pub(crate) fn spawn_blocking<F, R>(&self, body: F) -> AbortOnDropJoinHandle<R>
141    where
142        F: FnOnce(&cap_std::fs::File) -> R + Send + 'static,
143        R: Send + 'static,
144    {
145        let f = self.file.clone();
146        spawn_blocking(move || body(&f))
147    }
148
149    /// Returns `Some` when the current thread is allowed to block in filesystem
150    /// operations, and otherwise returns `None` to indicate that
151    /// `spawn_blocking` must be used.
152    pub(crate) fn as_blocking_file(&self) -> Option<&cap_std::fs::File> {
153        if self.allow_blocking_current_thread {
154            Some(&self.file)
155        } else {
156            None
157        }
158    }
159}
160
161bitflags::bitflags! {
162    /// Permission bits for operating on a directory.
163    ///
164    /// Directories can be limited to being readonly. This will restrict what
165    /// can be done with them, for example preventing creation of new files.
166    #[derive(Copy, Clone, Debug, PartialEq, Eq)]
167    pub struct DirPerms: usize {
168        /// This directory can be read, for example its entries can be iterated
169        /// over and files can be opened.
170        const READ = 0b1;
171
172        /// This directory can be mutated, for example by creating new files
173        /// within it.
174        const MUTATE = 0b10;
175    }
176}
177
178#[derive(Clone)]
179pub struct Dir {
180    /// The operating system file descriptor this struct is mediating access
181    /// to.
182    ///
183    /// Wrapped in an Arc because a copy is needed for [`spawn_blocking`].
184    ///
185    /// [`spawn_blocking`]: Self::spawn_blocking
186    pub dir: Arc<cap_std::fs::Dir>,
187    /// Permissions to enforce on access to this directory. These permissions
188    /// are specified by a user of the `crate::WasiCtxBuilder`, and
189    /// are enforced prior to any enforced by the underlying operating system.
190    ///
191    /// These permissions are also enforced on any directories opened under
192    /// this directory.
193    pub perms: DirPerms,
194    /// Permissions to enforce on any files opened under this directory.
195    pub file_perms: FilePerms,
196    /// The mode the directory was opened under: bits for reading, and writing.
197    /// Required to correctly report the DescriptorFlags, because cap-std
198    /// doesn't presently provide a cross-platform equivalent of reading the
199    /// oflags back out using fcntl.
200    pub open_mode: OpenMode,
201
202    allow_blocking_current_thread: bool,
203}
204
205impl Dir {
206    pub fn new(
207        dir: cap_std::fs::Dir,
208        perms: DirPerms,
209        file_perms: FilePerms,
210        open_mode: OpenMode,
211        allow_blocking_current_thread: bool,
212    ) -> Self {
213        Dir {
214            dir: Arc::new(dir),
215            perms,
216            file_perms,
217            open_mode,
218            allow_blocking_current_thread,
219        }
220    }
221
222    /// Execute the blocking `body` function.
223    ///
224    /// Depending on how the WasiCtx was configured, the body may either be:
225    /// - Executed directly on the current thread. In this case the `async`
226    ///   signature of this method is effectively a lie and the returned
227    ///   Future will always be immediately Ready. Or:
228    /// - Spawned on a background thread using [`tokio::task::spawn_blocking`]
229    ///   and immediately awaited.
230    ///
231    /// Intentionally blocking the executor thread might seem unorthodox, but is
232    /// not actually a problem for specific workloads. See:
233    /// - [`crate::WasiCtxBuilder::allow_blocking_current_thread`]
234    /// - [Poor performance of wasmtime file I/O maybe because tokio](https://github.com/bytecodealliance/wasmtime/issues/7973)
235    /// - [Implement opt-in for enabling WASI to block the current thread](https://github.com/bytecodealliance/wasmtime/pull/8190)
236    pub(crate) async fn run_blocking<F, R>(&self, body: F) -> R
237    where
238        F: FnOnce(&cap_std::fs::Dir) -> R + Send + 'static,
239        R: Send + 'static,
240    {
241        if self.allow_blocking_current_thread {
242            body(&self.dir)
243        } else {
244            let d = self.dir.clone();
245            spawn_blocking(move || body(&d)).await
246        }
247    }
248}
249
250pub struct FileInputStream {
251    file: File,
252    position: u64,
253    state: ReadState,
254}
255enum ReadState {
256    Idle,
257    Waiting(AbortOnDropJoinHandle<ReadState>),
258    DataAvailable(Bytes),
259    Error(io::Error),
260    Closed,
261}
262impl FileInputStream {
263    pub fn new(file: &File, position: u64) -> Self {
264        Self {
265            file: file.clone(),
266            position,
267            state: ReadState::Idle,
268        }
269    }
270
271    fn blocking_read(file: &cap_std::fs::File, offset: u64, size: usize) -> ReadState {
272        use system_interface::fs::FileIoExt;
273
274        let mut buf = BytesMut::zeroed(size);
275        loop {
276            match file.read_at(&mut buf, offset) {
277                Ok(0) => return ReadState::Closed,
278                Ok(n) => {
279                    buf.truncate(n);
280                    return ReadState::DataAvailable(buf.freeze());
281                }
282                Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {
283                    // Try again, continue looping
284                }
285                Err(e) => return ReadState::Error(e),
286            }
287        }
288    }
289
290    /// Wait for existing background task to finish, without starting any new background reads.
291    async fn wait_ready(&mut self) {
292        match &mut self.state {
293            ReadState::Waiting(task) => {
294                self.state = task.await;
295            }
296            _ => {}
297        }
298    }
299}
300#[async_trait::async_trait]
301impl InputStream for FileInputStream {
302    fn read(&mut self, size: usize) -> StreamResult<Bytes> {
303        match &mut self.state {
304            ReadState::Idle => {
305                if size == 0 {
306                    return Ok(Bytes::new());
307                }
308
309                let p = self.position;
310                self.state = ReadState::Waiting(
311                    self.file
312                        .spawn_blocking(move |f| Self::blocking_read(f, p, size)),
313                );
314                Ok(Bytes::new())
315            }
316            ReadState::DataAvailable(b) => {
317                let min_len = b.len().min(size);
318                let chunk = b.split_to(min_len);
319                if b.len() == 0 {
320                    self.state = ReadState::Idle;
321                }
322                self.position += min_len as u64;
323                Ok(chunk)
324            }
325            ReadState::Waiting(_) => Ok(Bytes::new()),
326            ReadState::Error(_) => match mem::replace(&mut self.state, ReadState::Closed) {
327                ReadState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),
328                _ => unreachable!(),
329            },
330            ReadState::Closed => Err(StreamError::Closed),
331        }
332    }
333    /// Specialized blocking_* variant to bypass tokio's task spawning & joining
334    /// overhead on synchronous file I/O.
335    async fn blocking_read(&mut self, size: usize) -> StreamResult<Bytes> {
336        self.wait_ready().await;
337
338        // Before we defer to the regular `read`, make sure it has data ready to go:
339        if let ReadState::Idle = self.state {
340            let p = self.position;
341            self.state = self
342                .file
343                .run_blocking(move |f| Self::blocking_read(f, p, size))
344                .await;
345        }
346
347        self.read(size)
348    }
349    async fn cancel(&mut self) {
350        match mem::replace(&mut self.state, ReadState::Closed) {
351            ReadState::Waiting(task) => {
352                // The task was created using `spawn_blocking`, so unless we're
353                // lucky enough that the task hasn't started yet, the abort
354                // signal won't have any effect and we're forced to wait for it
355                // to run to completion.
356                // From the guest's point of view, `input-stream::drop` then
357                // appears to block. Certainly less than ideal, but arguably still
358                // better than letting the guest rack up an unbounded number of
359                // background tasks. Also, the guest is only blocked if
360                // the stream was dropped mid-read, which we don't expect to
361                // occur frequently.
362                task.cancel().await;
363            }
364            _ => {}
365        }
366    }
367}
368#[async_trait::async_trait]
369impl Pollable for FileInputStream {
370    async fn ready(&mut self) {
371        if let ReadState::Idle = self.state {
372            // The guest hasn't initiated any read, but is nonetheless waiting
373            // for data to be available. We'll start a read for them:
374
375            const DEFAULT_READ_SIZE: usize = 4096;
376            let p = self.position;
377            self.state = ReadState::Waiting(
378                self.file
379                    .spawn_blocking(move |f| Self::blocking_read(f, p, DEFAULT_READ_SIZE)),
380            );
381        }
382
383        self.wait_ready().await
384    }
385}
386
387#[derive(Clone, Copy)]
388pub(crate) enum FileOutputMode {
389    Position(u64),
390    Append,
391}
392
393pub(crate) struct FileOutputStream {
394    file: File,
395    mode: FileOutputMode,
396    state: OutputState,
397}
398
399enum OutputState {
400    Ready,
401    /// Allows join future to be awaited in a cancellable manner. Gone variant indicates
402    /// no task is currently outstanding.
403    Waiting(AbortOnDropJoinHandle<io::Result<usize>>),
404    /// The last I/O operation failed with this error.
405    Error(io::Error),
406    Closed,
407}
408
409impl FileOutputStream {
410    pub fn write_at(file: &File, position: u64) -> Self {
411        Self {
412            file: file.clone(),
413            mode: FileOutputMode::Position(position),
414            state: OutputState::Ready,
415        }
416    }
417
418    pub fn append(file: &File) -> Self {
419        Self {
420            file: file.clone(),
421            mode: FileOutputMode::Append,
422            state: OutputState::Ready,
423        }
424    }
425
426    fn blocking_write(
427        file: &cap_std::fs::File,
428        mut buf: Bytes,
429        mode: FileOutputMode,
430    ) -> io::Result<usize> {
431        use system_interface::fs::FileIoExt;
432
433        match mode {
434            FileOutputMode::Position(mut p) => {
435                let mut total = 0;
436                loop {
437                    let nwritten = file.write_at(buf.as_ref(), p)?;
438                    // afterwards buf contains [nwritten, len):
439                    let _ = buf.split_to(nwritten);
440                    p += nwritten as u64;
441                    total += nwritten;
442                    if buf.is_empty() {
443                        break;
444                    }
445                }
446                Ok(total)
447            }
448            FileOutputMode::Append => {
449                let mut total = 0;
450                loop {
451                    let nwritten = file.append(buf.as_ref())?;
452                    let _ = buf.split_to(nwritten);
453                    total += nwritten;
454                    if buf.is_empty() {
455                        break;
456                    }
457                }
458                Ok(total)
459            }
460        }
461    }
462}
463
464// FIXME: configurable? determine from how much space left in file?
465const FILE_WRITE_CAPACITY: usize = 1024 * 1024;
466
467#[async_trait::async_trait]
468impl OutputStream for FileOutputStream {
469    fn write(&mut self, buf: Bytes) -> Result<(), StreamError> {
470        match self.state {
471            OutputState::Ready => {}
472            OutputState::Closed => return Err(StreamError::Closed),
473            OutputState::Waiting(_) | OutputState::Error(_) => {
474                // a write is pending - this call was not permitted
475                return Err(StreamError::Trap(anyhow!(
476                    "write not permitted: check_write not called first"
477                )));
478            }
479        }
480
481        let m = self.mode;
482        self.state = OutputState::Waiting(
483            self.file
484                .spawn_blocking(move |f| Self::blocking_write(f, buf, m)),
485        );
486        Ok(())
487    }
488    /// Specialized blocking_* variant to bypass tokio's task spawning & joining
489    /// overhead on synchronous file I/O.
490    async fn blocking_write_and_flush(&mut self, buf: Bytes) -> StreamResult<()> {
491        self.ready().await;
492
493        match self.state {
494            OutputState::Ready => {}
495            OutputState::Closed => return Err(StreamError::Closed),
496            OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {
497                OutputState::Error(e) => return Err(StreamError::LastOperationFailed(e.into())),
498                _ => unreachable!(),
499            },
500            OutputState::Waiting(_) => unreachable!("we've just waited for readiness"),
501        }
502
503        let m = self.mode;
504        match self
505            .file
506            .run_blocking(move |f| Self::blocking_write(f, buf, m))
507            .await
508        {
509            Ok(nwritten) => {
510                if let FileOutputMode::Position(p) = &mut self.mode {
511                    *p += nwritten as u64;
512                }
513                self.state = OutputState::Ready;
514                Ok(())
515            }
516            Err(e) => {
517                self.state = OutputState::Closed;
518                Err(StreamError::LastOperationFailed(e.into()))
519            }
520        }
521    }
522    fn flush(&mut self) -> Result<(), StreamError> {
523        match self.state {
524            // Only userland buffering of file writes is in the blocking task,
525            // so there's nothing extra that needs to be done to request a
526            // flush.
527            OutputState::Ready | OutputState::Waiting(_) => Ok(()),
528            OutputState::Closed => Err(StreamError::Closed),
529            OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {
530                OutputState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),
531                _ => unreachable!(),
532            },
533        }
534    }
535    fn check_write(&mut self) -> Result<usize, StreamError> {
536        match self.state {
537            OutputState::Ready => Ok(FILE_WRITE_CAPACITY),
538            OutputState::Closed => Err(StreamError::Closed),
539            OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {
540                OutputState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),
541                _ => unreachable!(),
542            },
543            OutputState::Waiting(_) => Ok(0),
544        }
545    }
546    async fn cancel(&mut self) {
547        match mem::replace(&mut self.state, OutputState::Closed) {
548            OutputState::Waiting(task) => {
549                // The task was created using `spawn_blocking`, so unless we're
550                // lucky enough that the task hasn't started yet, the abort
551                // signal won't have any effect and we're forced to wait for it
552                // to run to completion.
553                // From the guest's point of view, `output-stream::drop` then
554                // appears to block. Certainly less than ideal, but arguably still
555                // better than letting the guest rack up an unbounded number of
556                // background tasks. Also, the guest is only blocked if
557                // the stream was dropped mid-write, which we don't expect to
558                // occur frequently.
559                task.cancel().await;
560            }
561            _ => {}
562        }
563    }
564}
565
566#[async_trait::async_trait]
567impl Pollable for FileOutputStream {
568    async fn ready(&mut self) {
569        if let OutputState::Waiting(task) = &mut self.state {
570            self.state = match task.await {
571                Ok(nwritten) => {
572                    if let FileOutputMode::Position(p) = &mut self.mode {
573                        *p += nwritten as u64;
574                    }
575                    OutputState::Ready
576                }
577                Err(e) => OutputState::Error(e),
578            };
579        }
580    }
581}
582
583pub struct ReaddirIterator(
584    std::sync::Mutex<Box<dyn Iterator<Item = FsResult<types::DirectoryEntry>> + Send + 'static>>,
585);
586
587impl ReaddirIterator {
588    pub(crate) fn new(
589        i: impl Iterator<Item = FsResult<types::DirectoryEntry>> + Send + 'static,
590    ) -> Self {
591        ReaddirIterator(std::sync::Mutex::new(Box::new(i)))
592    }
593    pub(crate) fn next(&self) -> FsResult<Option<types::DirectoryEntry>> {
594        self.0.lock().unwrap().next().transpose()
595    }
596}
597
598impl IntoIterator for ReaddirIterator {
599    type Item = FsResult<types::DirectoryEntry>;
600    type IntoIter = Box<dyn Iterator<Item = Self::Item> + Send>;
601
602    fn into_iter(self) -> Self::IntoIter {
603        self.0.into_inner().unwrap()
604    }
605}