broker_tokio/fs/
file.rs

1//! Types for working with [`File`].
2//!
3//! [`File`]: File
4
5use self::State::*;
6use crate::fs::{asyncify, sys};
7use crate::io::blocking::Buf;
8use crate::io::{AsyncRead, AsyncSeek, AsyncWrite};
9
10use std::fmt;
11use std::fs::{Metadata, Permissions};
12use std::future::Future;
13use std::io::{self, Seek, SeekFrom};
14use std::path::Path;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::Context;
18use std::task::Poll;
19use std::task::Poll::*;
20
21/// A reference to an open file on the filesystem.
22///
23/// This is a specialized version of [`std::fs::File`][std] for usage from the
24/// Tokio runtime.
25///
26/// An instance of a `File` can be read and/or written depending on what options
27/// it was opened with. Files also implement Seek to alter the logical cursor
28/// that the file contains internally.
29///
30/// Files are automatically closed when they go out of scope.
31///
32/// [std]: std::fs::File
33///
34/// # Examples
35///
36/// Create a new file and asynchronously write bytes to it:
37///
38/// ```no_run
39/// use tokio::fs::File;
40/// use tokio::prelude::*;
41///
42/// # async fn dox() -> std::io::Result<()> {
43/// let mut file = File::create("foo.txt").await?;
44/// file.write_all(b"hello, world!").await?;
45/// # Ok(())
46/// # }
47/// ```
48///
49/// Read the contents of a file into a buffer
50///
51/// ```no_run
52/// use tokio::fs::File;
53/// use tokio::prelude::*;
54///
55/// # async fn dox() -> std::io::Result<()> {
56/// let mut file = File::open("foo.txt").await?;
57///
58/// let mut contents = vec![];
59/// file.read_to_end(&mut contents).await?;
60///
61/// println!("len = {}", contents.len());
62/// # Ok(())
63/// # }
64/// ```
65pub struct File {
66    std: Arc<sys::File>,
67    state: State,
68
69    /// Errors from writes/flushes are returned in write/flush calls. If a write
70    /// error is observed while performing a read, it is saved until the next
71    /// write / flush call.
72    last_write_err: Option<io::ErrorKind>,
73}
74
75#[derive(Debug)]
76enum State {
77    Idle(Option<Buf>),
78    Busy(sys::Blocking<(Operation, Buf)>),
79}
80
81#[derive(Debug)]
82enum Operation {
83    Read(io::Result<usize>),
84    Write(io::Result<()>),
85    Seek(io::Result<u64>),
86}
87
88impl File {
89    /// Attempts to open a file in read-only mode.
90    ///
91    /// See [`OpenOptions`] for more details.
92    ///
93    /// [`OpenOptions`]: super::OpenOptions
94    ///
95    /// # Errors
96    ///
97    /// This function will return an error if called from outside of the Tokio
98    /// runtime or if path does not already exist. Other errors may also be
99    /// returned according to OpenOptions::open.
100    ///
101    /// # Examples
102    ///
103    /// ```no_run
104    /// use tokio::fs::File;
105    /// use tokio::prelude::*;
106    ///
107    /// # async fn dox() -> std::io::Result<()> {
108    /// let mut file = File::open("foo.txt").await?;
109    ///
110    /// let mut contents = vec![];
111    /// file.read_to_end(&mut contents).await?;
112    ///
113    /// println!("len = {}", contents.len());
114    /// # Ok(())
115    /// # }
116    /// ```
117    pub async fn open(path: impl AsRef<Path>) -> io::Result<File> {
118        let path = path.as_ref().to_owned();
119        let std = asyncify(|| sys::File::open(path)).await?;
120
121        Ok(File::from_std(std))
122    }
123
124    /// Opens a file in write-only mode.
125    ///
126    /// This function will create a file if it does not exist, and will truncate
127    /// it if it does.
128    ///
129    /// See [`OpenOptions`] for more details.
130    ///
131    /// [`OpenOptions`]: super::OpenOptions
132    ///
133    /// # Errors
134    ///
135    /// Results in an error if called from outside of the Tokio runtime or if
136    /// the underlying [`create`] call results in an error.
137    ///
138    /// [`create`]: std::fs::File::create
139    ///
140    /// # Examples
141    ///
142    /// ```no_run
143    /// use tokio::fs::File;
144    /// use tokio::prelude::*;
145    ///
146    /// # async fn dox() -> std::io::Result<()> {
147    /// let mut file = File::create("foo.txt").await?;
148    /// file.write_all(b"hello, world!").await?;
149    /// # Ok(())
150    /// # }
151    /// ```
152    pub async fn create(path: impl AsRef<Path>) -> io::Result<File> {
153        let path = path.as_ref().to_owned();
154        let std_file = asyncify(move || sys::File::create(path)).await?;
155        Ok(File::from_std(std_file))
156    }
157
158    /// Convert a [`std::fs::File`][std] to a [`tokio::fs::File`][file].
159    ///
160    /// [std]: std::fs::File
161    /// [file]: File
162    ///
163    /// # Examples
164    ///
165    /// ```no_run
166    /// // This line could block. It is not recommended to do this on the Tokio
167    /// // runtime.
168    /// let std_file = std::fs::File::open("foo.txt").unwrap();
169    /// let file = tokio::fs::File::from_std(std_file);
170    /// ```
171    pub fn from_std(std: sys::File) -> File {
172        File {
173            std: Arc::new(std),
174            state: State::Idle(Some(Buf::with_capacity(0))),
175            last_write_err: None,
176        }
177    }
178
179    /// Seek to an offset, in bytes, in a stream.
180    ///
181    /// # Examples
182    ///
183    /// ```no_run
184    /// use tokio::fs::File;
185    /// use tokio::prelude::*;
186    ///
187    /// use std::io::SeekFrom;
188    ///
189    /// # async fn dox() -> std::io::Result<()> {
190    /// let mut file = File::open("foo.txt").await?;
191    /// file.seek(SeekFrom::Start(6)).await?;
192    ///
193    /// let mut contents = vec![0u8; 10];
194    /// file.read_exact(&mut contents).await?;
195    /// # Ok(())
196    /// # }
197    /// ```
198    pub async fn seek(&mut self, mut pos: SeekFrom) -> io::Result<u64> {
199        self.complete_inflight().await;
200
201        let mut buf = match self.state {
202            Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
203            _ => unreachable!(),
204        };
205
206        // Factor in any unread data from the buf
207        if !buf.is_empty() {
208            let n = buf.discard_read();
209
210            if let SeekFrom::Current(ref mut offset) = pos {
211                *offset += n;
212            }
213        }
214
215        let std = self.std.clone();
216
217        // Start the operation
218        self.state = Busy(sys::run(move || {
219            let res = (&*std).seek(pos);
220            (Operation::Seek(res), buf)
221        }));
222
223        let (op, buf) = match self.state {
224            Idle(_) => unreachable!(),
225            Busy(ref mut rx) => rx.await.unwrap(),
226        };
227
228        self.state = Idle(Some(buf));
229
230        match op {
231            Operation::Seek(res) => res,
232            _ => unreachable!(),
233        }
234    }
235
236    /// Attempts to sync all OS-internal metadata to disk.
237    ///
238    /// This function will attempt to ensure that all in-core data reaches the
239    /// filesystem before returning.
240    ///
241    /// # Examples
242    ///
243    /// ```no_run
244    /// use tokio::fs::File;
245    /// use tokio::prelude::*;
246    ///
247    /// # async fn dox() -> std::io::Result<()> {
248    /// let mut file = File::create("foo.txt").await?;
249    /// file.write_all(b"hello, world!").await?;
250    /// file.sync_all().await?;
251    /// # Ok(())
252    /// # }
253    /// ```
254    pub async fn sync_all(&mut self) -> io::Result<()> {
255        self.complete_inflight().await;
256
257        let std = self.std.clone();
258        asyncify(move || std.sync_all()).await
259    }
260
261    /// This function is similar to `sync_all`, except that it may not
262    /// synchronize file metadata to the filesystem.
263    ///
264    /// This is intended for use cases that must synchronize content, but don't
265    /// need the metadata on disk. The goal of this method is to reduce disk
266    /// operations.
267    ///
268    /// Note that some platforms may simply implement this in terms of `sync_all`.
269    ///
270    /// # Examples
271    ///
272    /// ```no_run
273    /// use tokio::fs::File;
274    /// use tokio::prelude::*;
275    ///
276    /// # async fn dox() -> std::io::Result<()> {
277    /// let mut file = File::create("foo.txt").await?;
278    /// file.write_all(b"hello, world!").await?;
279    /// file.sync_data().await?;
280    /// # Ok(())
281    /// # }
282    /// ```
283    pub async fn sync_data(&mut self) -> io::Result<()> {
284        self.complete_inflight().await;
285
286        let std = self.std.clone();
287        asyncify(move || std.sync_data()).await
288    }
289
290    /// Truncates or extends the underlying file, updating the size of this file to become size.
291    ///
292    /// If the size is less than the current file's size, then the file will be
293    /// shrunk. If it is greater than the current file's size, then the file
294    /// will be extended to size and have all of the intermediate data filled in
295    /// with 0s.
296    ///
297    /// # Errors
298    ///
299    /// This function will return an error if the file is not opened for
300    /// writing.
301    ///
302    /// # Examples
303    ///
304    /// ```no_run
305    /// use tokio::fs::File;
306    /// use tokio::prelude::*;
307    ///
308    /// # async fn dox() -> std::io::Result<()> {
309    /// let mut file = File::create("foo.txt").await?;
310    /// file.write_all(b"hello, world!").await?;
311    /// file.set_len(10).await?;
312    /// # Ok(())
313    /// # }
314    /// ```
315    pub async fn set_len(&mut self, size: u64) -> io::Result<()> {
316        self.complete_inflight().await;
317
318        let mut buf = match self.state {
319            Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
320            _ => unreachable!(),
321        };
322
323        let seek = if !buf.is_empty() {
324            Some(SeekFrom::Current(buf.discard_read()))
325        } else {
326            None
327        };
328
329        let std = self.std.clone();
330
331        self.state = Busy(sys::run(move || {
332            let res = if let Some(seek) = seek {
333                (&*std).seek(seek).and_then(|_| std.set_len(size))
334            } else {
335                std.set_len(size)
336            }
337            .map(|_| 0); // the value is discarded later
338
339            // Return the result as a seek
340            (Operation::Seek(res), buf)
341        }));
342
343        let (op, buf) = match self.state {
344            Idle(_) => unreachable!(),
345            Busy(ref mut rx) => rx.await?,
346        };
347
348        self.state = Idle(Some(buf));
349
350        match op {
351            Operation::Seek(res) => res.map(|_| ()),
352            _ => unreachable!(),
353        }
354    }
355
356    /// Queries metadata about the underlying file.
357    ///
358    /// # Examples
359    ///
360    /// ```no_run
361    /// use tokio::fs::File;
362    ///
363    /// # async fn dox() -> std::io::Result<()> {
364    /// let file = File::open("foo.txt").await?;
365    /// let metadata = file.metadata().await?;
366    ///
367    /// println!("{:?}", metadata);
368    /// # Ok(())
369    /// # }
370    /// ```
371    pub async fn metadata(&self) -> io::Result<Metadata> {
372        let std = self.std.clone();
373        asyncify(move || std.metadata()).await
374    }
375
376    /// Create a new `File` instance that shares the same underlying file handle
377    /// as the existing `File` instance. Reads, writes, and seeks will affect both
378    /// File instances simultaneously.
379    ///
380    /// # Examples
381    ///
382    /// ```no_run
383    /// use tokio::fs::File;
384    ///
385    /// # async fn dox() -> std::io::Result<()> {
386    /// let file = File::open("foo.txt").await?;
387    /// let file_clone = file.try_clone().await?;
388    /// # Ok(())
389    /// # }
390    /// ```
391    pub async fn try_clone(&self) -> io::Result<File> {
392        let std = self.std.clone();
393        let std_file = asyncify(move || std.try_clone()).await?;
394        Ok(File::from_std(std_file))
395    }
396
397    /// Destructures `File` into a [`std::fs::File`][std]. This function is
398    /// async to allow any in-flight operations to complete.
399    ///
400    /// Use `File::try_into_std` to attempt conversion immediately.
401    ///
402    /// [std]: std::fs::File
403    ///
404    /// # Examples
405    ///
406    /// ```no_run
407    /// use tokio::fs::File;
408    ///
409    /// # async fn dox() -> std::io::Result<()> {
410    /// let tokio_file = File::open("foo.txt").await?;
411    /// let std_file = tokio_file.into_std().await;
412    /// # Ok(())
413    /// # }
414    /// ```
415    pub async fn into_std(mut self) -> sys::File {
416        self.complete_inflight().await;
417        Arc::try_unwrap(self.std).expect("Arc::try_unwrap failed")
418    }
419
420    /// Tries to immediately destructure `File` into a [`std::fs::File`][std].
421    ///
422    /// [std]: std::fs::File
423    ///
424    /// # Errors
425    ///
426    /// This function will return an error containing the file if some
427    /// operation is in-flight.
428    ///
429    /// # Examples
430    ///
431    /// ```no_run
432    /// use tokio::fs::File;
433    ///
434    /// # async fn dox() -> std::io::Result<()> {
435    /// let tokio_file = File::open("foo.txt").await?;
436    /// let std_file = tokio_file.try_into_std().unwrap();
437    /// # Ok(())
438    /// # }
439    /// ```
440    pub fn try_into_std(mut self) -> Result<sys::File, Self> {
441        match Arc::try_unwrap(self.std) {
442            Ok(file) => Ok(file),
443            Err(std_file_arc) => {
444                self.std = std_file_arc;
445                Err(self)
446            }
447        }
448    }
449
450    /// Changes the permissions on the underlying file.
451    ///
452    /// # Platform-specific behavior
453    ///
454    /// This function currently corresponds to the `fchmod` function on Unix and
455    /// the `SetFileInformationByHandle` function on Windows. Note that, this
456    /// [may change in the future][changes].
457    ///
458    /// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior
459    ///
460    /// # Errors
461    ///
462    /// This function will return an error if the user lacks permission change
463    /// attributes on the underlying file. It may also return an error in other
464    /// os-specific unspecified cases.
465    ///
466    /// # Examples
467    ///
468    /// ```no_run
469    /// use tokio::fs::File;
470    ///
471    /// # async fn dox() -> std::io::Result<()> {
472    /// let file = File::open("foo.txt").await?;
473    /// let mut perms = file.metadata().await?.permissions();
474    /// perms.set_readonly(true);
475    /// file.set_permissions(perms).await?;
476    /// # Ok(())
477    /// # }
478    /// ```
479    pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> {
480        let std = self.std.clone();
481        asyncify(move || std.set_permissions(perm)).await
482    }
483
484    async fn complete_inflight(&mut self) {
485        use crate::future::poll_fn;
486
487        if let Err(e) = poll_fn(|cx| Pin::new(&mut *self).poll_flush(cx)).await {
488            self.last_write_err = Some(e.kind());
489        }
490    }
491}
492
493impl AsyncRead for File {
494    fn poll_read(
495        mut self: Pin<&mut Self>,
496        cx: &mut Context<'_>,
497        dst: &mut [u8],
498    ) -> Poll<io::Result<usize>> {
499        loop {
500            match self.state {
501                Idle(ref mut buf_cell) => {
502                    let mut buf = buf_cell.take().unwrap();
503
504                    if !buf.is_empty() {
505                        let n = buf.copy_to(dst);
506                        *buf_cell = Some(buf);
507                        return Ready(Ok(n));
508                    }
509
510                    buf.ensure_capacity_for(dst);
511                    let std = self.std.clone();
512
513                    self.state = Busy(sys::run(move || {
514                        let res = buf.read_from(&mut &*std);
515                        (Operation::Read(res), buf)
516                    }));
517                }
518                Busy(ref mut rx) => {
519                    let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?;
520
521                    match op {
522                        Operation::Read(Ok(_)) => {
523                            let n = buf.copy_to(dst);
524                            self.state = Idle(Some(buf));
525                            return Ready(Ok(n));
526                        }
527                        Operation::Read(Err(e)) => {
528                            assert!(buf.is_empty());
529
530                            self.state = Idle(Some(buf));
531                            return Ready(Err(e));
532                        }
533                        Operation::Write(Ok(_)) => {
534                            assert!(buf.is_empty());
535                            self.state = Idle(Some(buf));
536                            continue;
537                        }
538                        Operation::Write(Err(e)) => {
539                            assert!(self.last_write_err.is_none());
540                            self.last_write_err = Some(e.kind());
541                            self.state = Idle(Some(buf));
542                        }
543                        Operation::Seek(_) => {
544                            assert!(buf.is_empty());
545                            self.state = Idle(Some(buf));
546                            continue;
547                        }
548                    }
549                }
550            }
551        }
552    }
553}
554
555impl AsyncSeek for File {
556    fn start_seek(
557        mut self: Pin<&mut Self>,
558        cx: &mut Context<'_>,
559        mut pos: SeekFrom,
560    ) -> Poll<io::Result<()>> {
561        loop {
562            match self.state {
563                Idle(ref mut buf_cell) => {
564                    let mut buf = buf_cell.take().unwrap();
565
566                    // Factor in any unread data from the buf
567                    if !buf.is_empty() {
568                        let n = buf.discard_read();
569
570                        if let SeekFrom::Current(ref mut offset) = pos {
571                            *offset += n;
572                        }
573                    }
574
575                    let std = self.std.clone();
576
577                    self.state = Busy(sys::run(move || {
578                        let res = (&*std).seek(pos);
579                        (Operation::Seek(res), buf)
580                    }));
581
582                    return Ready(Ok(()));
583                }
584                Busy(ref mut rx) => {
585                    let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
586                    self.state = Idle(Some(buf));
587
588                    match op {
589                        Operation::Read(_) => {}
590                        Operation::Write(Err(e)) => {
591                            assert!(self.last_write_err.is_none());
592                            self.last_write_err = Some(e.kind());
593                        }
594                        Operation::Write(_) => {}
595                        Operation::Seek(_) => {}
596                    }
597                }
598            }
599        }
600    }
601
602    fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
603        loop {
604            match self.state {
605                Idle(_) => panic!("must call start_seek before calling poll_complete"),
606                Busy(ref mut rx) => {
607                    let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
608                    self.state = Idle(Some(buf));
609
610                    match op {
611                        Operation::Read(_) => {}
612                        Operation::Write(Err(e)) => {
613                            assert!(self.last_write_err.is_none());
614                            self.last_write_err = Some(e.kind());
615                        }
616                        Operation::Write(_) => {}
617                        Operation::Seek(res) => return Ready(res),
618                    }
619                }
620            }
621        }
622    }
623}
624
625impl AsyncWrite for File {
626    fn poll_write(
627        mut self: Pin<&mut Self>,
628        cx: &mut Context<'_>,
629        src: &[u8],
630    ) -> Poll<io::Result<usize>> {
631        if let Some(e) = self.last_write_err.take() {
632            return Ready(Err(e.into()));
633        }
634
635        loop {
636            match self.state {
637                Idle(ref mut buf_cell) => {
638                    let mut buf = buf_cell.take().unwrap();
639
640                    let seek = if !buf.is_empty() {
641                        Some(SeekFrom::Current(buf.discard_read()))
642                    } else {
643                        None
644                    };
645
646                    let n = buf.copy_from(src);
647                    let std = self.std.clone();
648
649                    self.state = Busy(sys::run(move || {
650                        let res = if let Some(seek) = seek {
651                            (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
652                        } else {
653                            buf.write_to(&mut &*std)
654                        };
655
656                        (Operation::Write(res), buf)
657                    }));
658
659                    return Ready(Ok(n));
660                }
661                Busy(ref mut rx) => {
662                    let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
663                    self.state = Idle(Some(buf));
664
665                    match op {
666                        Operation::Read(_) => {
667                            // We don't care about the result here. The fact
668                            // that the cursor has advanced will be reflected in
669                            // the next iteration of the loop
670                            continue;
671                        }
672                        Operation::Write(res) => {
673                            // If the previous write was successful, continue.
674                            // Otherwise, error.
675                            res?;
676                            continue;
677                        }
678                        Operation::Seek(_) => {
679                            // Ignore the seek
680                            continue;
681                        }
682                    }
683                }
684            }
685        }
686    }
687
688    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
689        if let Some(e) = self.last_write_err.take() {
690            return Ready(Err(e.into()));
691        }
692
693        let (op, buf) = match self.state {
694            Idle(_) => return Ready(Ok(())),
695            Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?,
696        };
697
698        // The buffer is not used here
699        self.state = Idle(Some(buf));
700
701        match op {
702            Operation::Read(_) => Ready(Ok(())),
703            Operation::Write(res) => Ready(res),
704            Operation::Seek(_) => Ready(Ok(())),
705        }
706    }
707
708    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
709        Poll::Ready(Ok(()))
710    }
711}
712
713impl From<sys::File> for File {
714    fn from(std: sys::File) -> Self {
715        Self::from_std(std)
716    }
717}
718
719impl fmt::Debug for File {
720    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
721        fmt.debug_struct("tokio::fs::File")
722            .field("std", &self.std)
723            .finish()
724    }
725}
726
727#[cfg(unix)]
728impl std::os::unix::io::AsRawFd for File {
729    fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
730        self.std.as_raw_fd()
731    }
732}
733
734#[cfg(windows)]
735impl std::os::windows::io::AsRawHandle for File {
736    fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
737        self.std.as_raw_handle()
738    }
739}