1use crate::bindings::filesystem::types;
2use crate::runtime::{spawn_blocking, AbortOnDropJoinHandle};
3use crate::{InputStream, OutputStream, Pollable, StreamError, StreamResult, TrappableError};
4use anyhow::anyhow;
5use bytes::{Bytes, BytesMut};
6use std::io;
7use std::mem;
8use std::sync::Arc;
9
10pub type FsResult<T> = Result<T, FsError>;
11
12pub type FsError = TrappableError<types::ErrorCode>;
13
14impl From<wasmtime::component::ResourceTableError> for FsError {
15 fn from(error: wasmtime::component::ResourceTableError) -> Self {
16 Self::trap(error)
17 }
18}
19
20impl From<io::Error> for FsError {
21 fn from(error: io::Error) -> Self {
22 types::ErrorCode::from(error).into()
23 }
24}
25
26pub enum Descriptor {
27 File(File),
28 Dir(Dir),
29}
30
31impl Descriptor {
32 pub fn file(&self) -> Result<&File, types::ErrorCode> {
33 match self {
34 Descriptor::File(f) => Ok(f),
35 Descriptor::Dir(_) => Err(types::ErrorCode::BadDescriptor),
36 }
37 }
38
39 pub fn dir(&self) -> Result<&Dir, types::ErrorCode> {
40 match self {
41 Descriptor::Dir(d) => Ok(d),
42 Descriptor::File(_) => Err(types::ErrorCode::NotDirectory),
43 }
44 }
45
46 pub fn is_file(&self) -> bool {
47 match self {
48 Descriptor::File(_) => true,
49 Descriptor::Dir(_) => false,
50 }
51 }
52
53 pub fn is_dir(&self) -> bool {
54 match self {
55 Descriptor::File(_) => false,
56 Descriptor::Dir(_) => true,
57 }
58 }
59}
60
61bitflags::bitflags! {
62 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
63 pub struct FilePerms: usize {
64 const READ = 0b1;
65 const WRITE = 0b10;
66 }
67}
68
69bitflags::bitflags! {
70 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
71 pub struct OpenMode: usize {
72 const READ = 0b1;
73 const WRITE = 0b10;
74 }
75}
76
77#[derive(Clone)]
78pub struct File {
79 pub file: Arc<cap_std::fs::File>,
87 pub perms: FilePerms,
91 pub open_mode: OpenMode,
96
97 allow_blocking_current_thread: bool,
98}
99
100impl File {
101 pub fn new(
102 file: cap_std::fs::File,
103 perms: FilePerms,
104 open_mode: OpenMode,
105 allow_blocking_current_thread: bool,
106 ) -> Self {
107 Self {
108 file: Arc::new(file),
109 perms,
110 open_mode,
111 allow_blocking_current_thread,
112 }
113 }
114
115 pub(crate) async fn run_blocking<F, R>(&self, body: F) -> R
130 where
131 F: FnOnce(&cap_std::fs::File) -> R + Send + 'static,
132 R: Send + 'static,
133 {
134 match self.as_blocking_file() {
135 Some(file) => body(file),
136 None => self.spawn_blocking(body).await,
137 }
138 }
139
140 pub(crate) fn spawn_blocking<F, R>(&self, body: F) -> AbortOnDropJoinHandle<R>
141 where
142 F: FnOnce(&cap_std::fs::File) -> R + Send + 'static,
143 R: Send + 'static,
144 {
145 let f = self.file.clone();
146 spawn_blocking(move || body(&f))
147 }
148
149 pub(crate) fn as_blocking_file(&self) -> Option<&cap_std::fs::File> {
153 if self.allow_blocking_current_thread {
154 Some(&self.file)
155 } else {
156 None
157 }
158 }
159}
160
161bitflags::bitflags! {
162 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
167 pub struct DirPerms: usize {
168 const READ = 0b1;
171
172 const MUTATE = 0b10;
175 }
176}
177
178#[derive(Clone)]
179pub struct Dir {
180 pub dir: Arc<cap_std::fs::Dir>,
187 pub perms: DirPerms,
194 pub file_perms: FilePerms,
196 pub open_mode: OpenMode,
201
202 allow_blocking_current_thread: bool,
203}
204
205impl Dir {
206 pub fn new(
207 dir: cap_std::fs::Dir,
208 perms: DirPerms,
209 file_perms: FilePerms,
210 open_mode: OpenMode,
211 allow_blocking_current_thread: bool,
212 ) -> Self {
213 Dir {
214 dir: Arc::new(dir),
215 perms,
216 file_perms,
217 open_mode,
218 allow_blocking_current_thread,
219 }
220 }
221
222 pub(crate) async fn run_blocking<F, R>(&self, body: F) -> R
237 where
238 F: FnOnce(&cap_std::fs::Dir) -> R + Send + 'static,
239 R: Send + 'static,
240 {
241 if self.allow_blocking_current_thread {
242 body(&self.dir)
243 } else {
244 let d = self.dir.clone();
245 spawn_blocking(move || body(&d)).await
246 }
247 }
248}
249
250pub struct FileInputStream {
251 file: File,
252 position: u64,
253 state: ReadState,
254}
255enum ReadState {
256 Idle,
257 Waiting(AbortOnDropJoinHandle<ReadState>),
258 DataAvailable(Bytes),
259 Error(io::Error),
260 Closed,
261}
262impl FileInputStream {
263 pub fn new(file: &File, position: u64) -> Self {
264 Self {
265 file: file.clone(),
266 position,
267 state: ReadState::Idle,
268 }
269 }
270
271 fn blocking_read(file: &cap_std::fs::File, offset: u64, size: usize) -> ReadState {
272 use system_interface::fs::FileIoExt;
273
274 let mut buf = BytesMut::zeroed(size);
275 loop {
276 match file.read_at(&mut buf, offset) {
277 Ok(0) => return ReadState::Closed,
278 Ok(n) => {
279 buf.truncate(n);
280 return ReadState::DataAvailable(buf.freeze());
281 }
282 Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {
283 }
285 Err(e) => return ReadState::Error(e),
286 }
287 }
288 }
289
290 async fn wait_ready(&mut self) {
292 match &mut self.state {
293 ReadState::Waiting(task) => {
294 self.state = task.await;
295 }
296 _ => {}
297 }
298 }
299}
300#[async_trait::async_trait]
301impl InputStream for FileInputStream {
302 fn read(&mut self, size: usize) -> StreamResult<Bytes> {
303 match &mut self.state {
304 ReadState::Idle => {
305 if size == 0 {
306 return Ok(Bytes::new());
307 }
308
309 let p = self.position;
310 self.state = ReadState::Waiting(
311 self.file
312 .spawn_blocking(move |f| Self::blocking_read(f, p, size)),
313 );
314 Ok(Bytes::new())
315 }
316 ReadState::DataAvailable(b) => {
317 let min_len = b.len().min(size);
318 let chunk = b.split_to(min_len);
319 if b.len() == 0 {
320 self.state = ReadState::Idle;
321 }
322 self.position += min_len as u64;
323 Ok(chunk)
324 }
325 ReadState::Waiting(_) => Ok(Bytes::new()),
326 ReadState::Error(_) => match mem::replace(&mut self.state, ReadState::Closed) {
327 ReadState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),
328 _ => unreachable!(),
329 },
330 ReadState::Closed => Err(StreamError::Closed),
331 }
332 }
333 async fn blocking_read(&mut self, size: usize) -> StreamResult<Bytes> {
336 self.wait_ready().await;
337
338 if let ReadState::Idle = self.state {
340 let p = self.position;
341 self.state = self
342 .file
343 .run_blocking(move |f| Self::blocking_read(f, p, size))
344 .await;
345 }
346
347 self.read(size)
348 }
349 async fn cancel(&mut self) {
350 match mem::replace(&mut self.state, ReadState::Closed) {
351 ReadState::Waiting(task) => {
352 task.cancel().await;
363 }
364 _ => {}
365 }
366 }
367}
368#[async_trait::async_trait]
369impl Pollable for FileInputStream {
370 async fn ready(&mut self) {
371 if let ReadState::Idle = self.state {
372 const DEFAULT_READ_SIZE: usize = 4096;
376 let p = self.position;
377 self.state = ReadState::Waiting(
378 self.file
379 .spawn_blocking(move |f| Self::blocking_read(f, p, DEFAULT_READ_SIZE)),
380 );
381 }
382
383 self.wait_ready().await
384 }
385}
386
387#[derive(Clone, Copy)]
388pub(crate) enum FileOutputMode {
389 Position(u64),
390 Append,
391}
392
393pub(crate) struct FileOutputStream {
394 file: File,
395 mode: FileOutputMode,
396 state: OutputState,
397}
398
399enum OutputState {
400 Ready,
401 Waiting(AbortOnDropJoinHandle<io::Result<usize>>),
404 Error(io::Error),
406 Closed,
407}
408
409impl FileOutputStream {
410 pub fn write_at(file: &File, position: u64) -> Self {
411 Self {
412 file: file.clone(),
413 mode: FileOutputMode::Position(position),
414 state: OutputState::Ready,
415 }
416 }
417
418 pub fn append(file: &File) -> Self {
419 Self {
420 file: file.clone(),
421 mode: FileOutputMode::Append,
422 state: OutputState::Ready,
423 }
424 }
425
426 fn blocking_write(
427 file: &cap_std::fs::File,
428 mut buf: Bytes,
429 mode: FileOutputMode,
430 ) -> io::Result<usize> {
431 use system_interface::fs::FileIoExt;
432
433 match mode {
434 FileOutputMode::Position(mut p) => {
435 let mut total = 0;
436 loop {
437 let nwritten = file.write_at(buf.as_ref(), p)?;
438 let _ = buf.split_to(nwritten);
440 p += nwritten as u64;
441 total += nwritten;
442 if buf.is_empty() {
443 break;
444 }
445 }
446 Ok(total)
447 }
448 FileOutputMode::Append => {
449 let mut total = 0;
450 loop {
451 let nwritten = file.append(buf.as_ref())?;
452 let _ = buf.split_to(nwritten);
453 total += nwritten;
454 if buf.is_empty() {
455 break;
456 }
457 }
458 Ok(total)
459 }
460 }
461 }
462}
463
464const FILE_WRITE_CAPACITY: usize = 1024 * 1024;
466
467#[async_trait::async_trait]
468impl OutputStream for FileOutputStream {
469 fn write(&mut self, buf: Bytes) -> Result<(), StreamError> {
470 match self.state {
471 OutputState::Ready => {}
472 OutputState::Closed => return Err(StreamError::Closed),
473 OutputState::Waiting(_) | OutputState::Error(_) => {
474 return Err(StreamError::Trap(anyhow!(
476 "write not permitted: check_write not called first"
477 )));
478 }
479 }
480
481 let m = self.mode;
482 self.state = OutputState::Waiting(
483 self.file
484 .spawn_blocking(move |f| Self::blocking_write(f, buf, m)),
485 );
486 Ok(())
487 }
488 async fn blocking_write_and_flush(&mut self, buf: Bytes) -> StreamResult<()> {
491 self.ready().await;
492
493 match self.state {
494 OutputState::Ready => {}
495 OutputState::Closed => return Err(StreamError::Closed),
496 OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {
497 OutputState::Error(e) => return Err(StreamError::LastOperationFailed(e.into())),
498 _ => unreachable!(),
499 },
500 OutputState::Waiting(_) => unreachable!("we've just waited for readiness"),
501 }
502
503 let m = self.mode;
504 match self
505 .file
506 .run_blocking(move |f| Self::blocking_write(f, buf, m))
507 .await
508 {
509 Ok(nwritten) => {
510 if let FileOutputMode::Position(p) = &mut self.mode {
511 *p += nwritten as u64;
512 }
513 self.state = OutputState::Ready;
514 Ok(())
515 }
516 Err(e) => {
517 self.state = OutputState::Closed;
518 Err(StreamError::LastOperationFailed(e.into()))
519 }
520 }
521 }
522 fn flush(&mut self) -> Result<(), StreamError> {
523 match self.state {
524 OutputState::Ready | OutputState::Waiting(_) => Ok(()),
528 OutputState::Closed => Err(StreamError::Closed),
529 OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {
530 OutputState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),
531 _ => unreachable!(),
532 },
533 }
534 }
535 fn check_write(&mut self) -> Result<usize, StreamError> {
536 match self.state {
537 OutputState::Ready => Ok(FILE_WRITE_CAPACITY),
538 OutputState::Closed => Err(StreamError::Closed),
539 OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {
540 OutputState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),
541 _ => unreachable!(),
542 },
543 OutputState::Waiting(_) => Ok(0),
544 }
545 }
546 async fn cancel(&mut self) {
547 match mem::replace(&mut self.state, OutputState::Closed) {
548 OutputState::Waiting(task) => {
549 task.cancel().await;
560 }
561 _ => {}
562 }
563 }
564}
565
566#[async_trait::async_trait]
567impl Pollable for FileOutputStream {
568 async fn ready(&mut self) {
569 if let OutputState::Waiting(task) = &mut self.state {
570 self.state = match task.await {
571 Ok(nwritten) => {
572 if let FileOutputMode::Position(p) = &mut self.mode {
573 *p += nwritten as u64;
574 }
575 OutputState::Ready
576 }
577 Err(e) => OutputState::Error(e),
578 };
579 }
580 }
581}
582
583pub struct ReaddirIterator(
584 std::sync::Mutex<Box<dyn Iterator<Item = FsResult<types::DirectoryEntry>> + Send + 'static>>,
585);
586
587impl ReaddirIterator {
588 pub(crate) fn new(
589 i: impl Iterator<Item = FsResult<types::DirectoryEntry>> + Send + 'static,
590 ) -> Self {
591 ReaddirIterator(std::sync::Mutex::new(Box::new(i)))
592 }
593 pub(crate) fn next(&self) -> FsResult<Option<types::DirectoryEntry>> {
594 self.0.lock().unwrap().next().transpose()
595 }
596}
597
598impl IntoIterator for ReaddirIterator {
599 type Item = FsResult<types::DirectoryEntry>;
600 type IntoIter = Box<dyn Iterator<Item = Self::Item> + Send>;
601
602 fn into_iter(self) -> Self::IntoIter {
603 self.0.into_inner().unwrap()
604 }
605}