madsim_real_tokio/fs/
file.rs

1//! Types for working with [`File`].
2//!
3//! [`File`]: File
4
5use crate::fs::{asyncify, OpenOptions};
6use crate::io::blocking::{Buf, DEFAULT_MAX_BUF_SIZE};
7use crate::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
8use crate::sync::Mutex;
9
10use std::fmt;
11use std::fs::{Metadata, Permissions};
12use std::future::Future;
13use std::io::{self, Seek, SeekFrom};
14use std::path::Path;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::Context;
18use std::task::Poll;
19
20#[cfg(test)]
21use super::mocks::JoinHandle;
22#[cfg(test)]
23use super::mocks::MockFile as StdFile;
24#[cfg(test)]
25use super::mocks::{spawn_blocking, spawn_mandatory_blocking};
26#[cfg(not(test))]
27use crate::blocking::JoinHandle;
28#[cfg(not(test))]
29use crate::blocking::{spawn_blocking, spawn_mandatory_blocking};
30#[cfg(not(test))]
31use std::fs::File as StdFile;
32
33/// A reference to an open file on the filesystem.
34///
35/// This is a specialized version of [`std::fs::File`] for usage from the
36/// Tokio runtime.
37///
38/// An instance of a `File` can be read and/or written depending on what options
39/// it was opened with. Files also implement [`AsyncSeek`] to alter the logical
40/// cursor that the file contains internally.
41///
42/// A file will not be closed immediately when it goes out of scope if there
43/// are any IO operations that have not yet completed. To ensure that a file is
44/// closed immediately when it is dropped, you should call [`flush`] before
45/// dropping it. Note that this does not ensure that the file has been fully
46/// written to disk; the operating system might keep the changes around in an
47/// in-memory buffer. See the [`sync_all`] method for telling the OS to write
48/// the data to disk.
49///
50/// Reading and writing to a `File` is usually done using the convenience
51/// methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] traits.
52///
53/// [`AsyncSeek`]: trait@crate::io::AsyncSeek
54/// [`flush`]: fn@crate::io::AsyncWriteExt::flush
55/// [`sync_all`]: fn@crate::fs::File::sync_all
56/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
57/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
58///
59/// # Examples
60///
61/// Create a new file and asynchronously write bytes to it:
62///
63/// ```no_run
64/// use tokio::fs::File;
65/// use tokio::io::AsyncWriteExt; // for write_all()
66///
67/// # async fn dox() -> std::io::Result<()> {
68/// let mut file = File::create("foo.txt").await?;
69/// file.write_all(b"hello, world!").await?;
70/// # Ok(())
71/// # }
72/// ```
73///
74/// Read the contents of a file into a buffer:
75///
76/// ```no_run
77/// use tokio::fs::File;
78/// use tokio::io::AsyncReadExt; // for read_to_end()
79///
80/// # async fn dox() -> std::io::Result<()> {
81/// let mut file = File::open("foo.txt").await?;
82///
83/// let mut contents = vec![];
84/// file.read_to_end(&mut contents).await?;
85///
86/// println!("len = {}", contents.len());
87/// # Ok(())
88/// # }
89/// ```
90pub struct File {
91    std: Arc<StdFile>,
92    inner: Mutex<Inner>,
93    max_buf_size: usize,
94}
95
96struct Inner {
97    state: State,
98
99    /// Errors from writes/flushes are returned in write/flush calls. If a write
100    /// error is observed while performing a read, it is saved until the next
101    /// write / flush call.
102    last_write_err: Option<io::ErrorKind>,
103
104    pos: u64,
105}
106
107#[derive(Debug)]
108enum State {
109    Idle(Option<Buf>),
110    Busy(JoinHandle<(Operation, Buf)>),
111}
112
113#[derive(Debug)]
114enum Operation {
115    Read(io::Result<usize>),
116    Write(io::Result<()>),
117    Seek(io::Result<u64>),
118}
119
120impl File {
121    /// Attempts to open a file in read-only mode.
122    ///
123    /// See [`OpenOptions`] for more details.
124    ///
125    /// # Errors
126    ///
127    /// This function will return an error if called from outside of the Tokio
128    /// runtime or if path does not already exist. Other errors may also be
129    /// returned according to `OpenOptions::open`.
130    ///
131    /// # Examples
132    ///
133    /// ```no_run
134    /// use tokio::fs::File;
135    /// use tokio::io::AsyncReadExt;
136    ///
137    /// # async fn dox() -> std::io::Result<()> {
138    /// let mut file = File::open("foo.txt").await?;
139    ///
140    /// let mut contents = vec![];
141    /// file.read_to_end(&mut contents).await?;
142    ///
143    /// println!("len = {}", contents.len());
144    /// # Ok(())
145    /// # }
146    /// ```
147    ///
148    /// The [`read_to_end`] method is defined on the [`AsyncReadExt`] trait.
149    ///
150    /// [`read_to_end`]: fn@crate::io::AsyncReadExt::read_to_end
151    /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
152    pub async fn open(path: impl AsRef<Path>) -> io::Result<File> {
153        let path = path.as_ref().to_owned();
154        let std = asyncify(|| StdFile::open(path)).await?;
155
156        Ok(File::from_std(std))
157    }
158
159    /// Opens a file in write-only mode.
160    ///
161    /// This function will create a file if it does not exist, and will truncate
162    /// it if it does.
163    ///
164    /// See [`OpenOptions`] for more details.
165    ///
166    /// # Errors
167    ///
168    /// Results in an error if called from outside of the Tokio runtime or if
169    /// the underlying [`create`] call results in an error.
170    ///
171    /// [`create`]: std::fs::File::create
172    ///
173    /// # Examples
174    ///
175    /// ```no_run
176    /// use tokio::fs::File;
177    /// use tokio::io::AsyncWriteExt;
178    ///
179    /// # async fn dox() -> std::io::Result<()> {
180    /// let mut file = File::create("foo.txt").await?;
181    /// file.write_all(b"hello, world!").await?;
182    /// # Ok(())
183    /// # }
184    /// ```
185    ///
186    /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
187    ///
188    /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
189    /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
190    pub async fn create(path: impl AsRef<Path>) -> io::Result<File> {
191        let path = path.as_ref().to_owned();
192        let std_file = asyncify(move || StdFile::create(path)).await?;
193        Ok(File::from_std(std_file))
194    }
195
196    /// Returns a new [`OpenOptions`] object.
197    ///
198    /// This function returns a new `OpenOptions` object that you can use to
199    /// open or create a file with specific options if `open()` or `create()`
200    /// are not appropriate.
201    ///
202    /// It is equivalent to `OpenOptions::new()`, but allows you to write more
203    /// readable code. Instead of
204    /// `OpenOptions::new().append(true).open("example.log")`,
205    /// you can write `File::options().append(true).open("example.log")`. This
206    /// also avoids the need to import `OpenOptions`.
207    ///
208    /// See the [`OpenOptions::new`] function for more details.
209    ///
210    /// # Examples
211    ///
212    /// ```no_run
213    /// use tokio::fs::File;
214    /// use tokio::io::AsyncWriteExt;
215    ///
216    /// # async fn dox() -> std::io::Result<()> {
217    /// let mut f = File::options().append(true).open("example.log").await?;
218    /// f.write_all(b"new line\n").await?;
219    /// # Ok(())
220    /// # }
221    /// ```
222    #[must_use]
223    pub fn options() -> OpenOptions {
224        OpenOptions::new()
225    }
226
227    /// Converts a [`std::fs::File`] to a [`tokio::fs::File`](File).
228    ///
229    /// # Examples
230    ///
231    /// ```no_run
232    /// // This line could block. It is not recommended to do this on the Tokio
233    /// // runtime.
234    /// let std_file = std::fs::File::open("foo.txt").unwrap();
235    /// let file = tokio::fs::File::from_std(std_file);
236    /// ```
237    pub fn from_std(std: StdFile) -> File {
238        File {
239            std: Arc::new(std),
240            inner: Mutex::new(Inner {
241                state: State::Idle(Some(Buf::with_capacity(0))),
242                last_write_err: None,
243                pos: 0,
244            }),
245            max_buf_size: DEFAULT_MAX_BUF_SIZE,
246        }
247    }
248
249    /// Attempts to sync all OS-internal metadata to disk.
250    ///
251    /// This function will attempt to ensure that all in-core data reaches the
252    /// filesystem before returning.
253    ///
254    /// # Examples
255    ///
256    /// ```no_run
257    /// use tokio::fs::File;
258    /// use tokio::io::AsyncWriteExt;
259    ///
260    /// # async fn dox() -> std::io::Result<()> {
261    /// let mut file = File::create("foo.txt").await?;
262    /// file.write_all(b"hello, world!").await?;
263    /// file.sync_all().await?;
264    /// # Ok(())
265    /// # }
266    /// ```
267    ///
268    /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
269    ///
270    /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
271    /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
272    pub async fn sync_all(&self) -> io::Result<()> {
273        let mut inner = self.inner.lock().await;
274        inner.complete_inflight().await;
275
276        let std = self.std.clone();
277        asyncify(move || std.sync_all()).await
278    }
279
280    /// This function is similar to `sync_all`, except that it may not
281    /// synchronize file metadata to the filesystem.
282    ///
283    /// This is intended for use cases that must synchronize content, but don't
284    /// need the metadata on disk. The goal of this method is to reduce disk
285    /// operations.
286    ///
287    /// Note that some platforms may simply implement this in terms of `sync_all`.
288    ///
289    /// # Examples
290    ///
291    /// ```no_run
292    /// use tokio::fs::File;
293    /// use tokio::io::AsyncWriteExt;
294    ///
295    /// # async fn dox() -> std::io::Result<()> {
296    /// let mut file = File::create("foo.txt").await?;
297    /// file.write_all(b"hello, world!").await?;
298    /// file.sync_data().await?;
299    /// # Ok(())
300    /// # }
301    /// ```
302    ///
303    /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
304    ///
305    /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
306    /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
307    pub async fn sync_data(&self) -> io::Result<()> {
308        let mut inner = self.inner.lock().await;
309        inner.complete_inflight().await;
310
311        let std = self.std.clone();
312        asyncify(move || std.sync_data()).await
313    }
314
315    /// Truncates or extends the underlying file, updating the size of this file to become size.
316    ///
317    /// If the size is less than the current file's size, then the file will be
318    /// shrunk. If it is greater than the current file's size, then the file
319    /// will be extended to size and have all of the intermediate data filled in
320    /// with 0s.
321    ///
322    /// # Errors
323    ///
324    /// This function will return an error if the file is not opened for
325    /// writing.
326    ///
327    /// # Examples
328    ///
329    /// ```no_run
330    /// use tokio::fs::File;
331    /// use tokio::io::AsyncWriteExt;
332    ///
333    /// # async fn dox() -> std::io::Result<()> {
334    /// let mut file = File::create("foo.txt").await?;
335    /// file.write_all(b"hello, world!").await?;
336    /// file.set_len(10).await?;
337    /// # Ok(())
338    /// # }
339    /// ```
340    ///
341    /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
342    ///
343    /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
344    /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
345    pub async fn set_len(&self, size: u64) -> io::Result<()> {
346        let mut inner = self.inner.lock().await;
347        inner.complete_inflight().await;
348
349        let mut buf = match inner.state {
350            State::Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
351            _ => unreachable!(),
352        };
353
354        let seek = if !buf.is_empty() {
355            Some(SeekFrom::Current(buf.discard_read()))
356        } else {
357            None
358        };
359
360        let std = self.std.clone();
361
362        inner.state = State::Busy(spawn_blocking(move || {
363            let res = if let Some(seek) = seek {
364                (&*std).seek(seek).and_then(|_| std.set_len(size))
365            } else {
366                std.set_len(size)
367            }
368            .map(|()| 0); // the value is discarded later
369
370            // Return the result as a seek
371            (Operation::Seek(res), buf)
372        }));
373
374        let (op, buf) = match inner.state {
375            State::Idle(_) => unreachable!(),
376            State::Busy(ref mut rx) => rx.await?,
377        };
378
379        inner.state = State::Idle(Some(buf));
380
381        match op {
382            Operation::Seek(res) => res.map(|pos| {
383                inner.pos = pos;
384            }),
385            _ => unreachable!(),
386        }
387    }
388
389    /// Queries metadata about the underlying file.
390    ///
391    /// # Examples
392    ///
393    /// ```no_run
394    /// use tokio::fs::File;
395    ///
396    /// # async fn dox() -> std::io::Result<()> {
397    /// let file = File::open("foo.txt").await?;
398    /// let metadata = file.metadata().await?;
399    ///
400    /// println!("{:?}", metadata);
401    /// # Ok(())
402    /// # }
403    /// ```
404    pub async fn metadata(&self) -> io::Result<Metadata> {
405        let std = self.std.clone();
406        asyncify(move || std.metadata()).await
407    }
408
409    /// Creates a new `File` instance that shares the same underlying file handle
410    /// as the existing `File` instance. Reads, writes, and seeks will affect both
411    /// File instances simultaneously.
412    ///
413    /// # Examples
414    ///
415    /// ```no_run
416    /// use tokio::fs::File;
417    ///
418    /// # async fn dox() -> std::io::Result<()> {
419    /// let file = File::open("foo.txt").await?;
420    /// let file_clone = file.try_clone().await?;
421    /// # Ok(())
422    /// # }
423    /// ```
424    pub async fn try_clone(&self) -> io::Result<File> {
425        self.inner.lock().await.complete_inflight().await;
426        let std = self.std.clone();
427        let std_file = asyncify(move || std.try_clone()).await?;
428        Ok(File::from_std(std_file))
429    }
430
431    /// Destructures `File` into a [`std::fs::File`]. This function is
432    /// async to allow any in-flight operations to complete.
433    ///
434    /// Use `File::try_into_std` to attempt conversion immediately.
435    ///
436    /// # Examples
437    ///
438    /// ```no_run
439    /// use tokio::fs::File;
440    ///
441    /// # async fn dox() -> std::io::Result<()> {
442    /// let tokio_file = File::open("foo.txt").await?;
443    /// let std_file = tokio_file.into_std().await;
444    /// # Ok(())
445    /// # }
446    /// ```
447    pub async fn into_std(mut self) -> StdFile {
448        self.inner.get_mut().complete_inflight().await;
449        Arc::try_unwrap(self.std).expect("Arc::try_unwrap failed")
450    }
451
452    /// Tries to immediately destructure `File` into a [`std::fs::File`].
453    ///
454    /// # Errors
455    ///
456    /// This function will return an error containing the file if some
457    /// operation is in-flight.
458    ///
459    /// # Examples
460    ///
461    /// ```no_run
462    /// use tokio::fs::File;
463    ///
464    /// # async fn dox() -> std::io::Result<()> {
465    /// let tokio_file = File::open("foo.txt").await?;
466    /// let std_file = tokio_file.try_into_std().unwrap();
467    /// # Ok(())
468    /// # }
469    /// ```
470    pub fn try_into_std(mut self) -> Result<StdFile, Self> {
471        match Arc::try_unwrap(self.std) {
472            Ok(file) => Ok(file),
473            Err(std_file_arc) => {
474                self.std = std_file_arc;
475                Err(self)
476            }
477        }
478    }
479
480    /// Changes the permissions on the underlying file.
481    ///
482    /// # Platform-specific behavior
483    ///
484    /// This function currently corresponds to the `fchmod` function on Unix and
485    /// the `SetFileInformationByHandle` function on Windows. Note that, this
486    /// [may change in the future][changes].
487    ///
488    /// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior
489    ///
490    /// # Errors
491    ///
492    /// This function will return an error if the user lacks permission change
493    /// attributes on the underlying file. It may also return an error in other
494    /// os-specific unspecified cases.
495    ///
496    /// # Examples
497    ///
498    /// ```no_run
499    /// use tokio::fs::File;
500    ///
501    /// # async fn dox() -> std::io::Result<()> {
502    /// let file = File::open("foo.txt").await?;
503    /// let mut perms = file.metadata().await?.permissions();
504    /// perms.set_readonly(true);
505    /// file.set_permissions(perms).await?;
506    /// # Ok(())
507    /// # }
508    /// ```
509    pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> {
510        let std = self.std.clone();
511        asyncify(move || std.set_permissions(perm)).await
512    }
513
514    /// Set the maximum buffer size for the underlying [`AsyncRead`] / [`AsyncWrite`] operation.
515    ///
516    /// Although Tokio uses a sensible default value for this buffer size, this function would be
517    /// useful for changing that default depending on the situation.
518    ///
519    /// # Examples
520    ///
521    /// ```no_run
522    /// use tokio::fs::File;
523    /// use tokio::io::AsyncWriteExt;
524    ///
525    /// # async fn dox() -> std::io::Result<()> {
526    /// let mut file = File::open("foo.txt").await?;
527    ///
528    /// // Set maximum buffer size to 8 MiB
529    /// file.set_max_buf_size(8 * 1024 * 1024);
530    ///
531    /// let mut buf = vec![1; 1024 * 1024 * 1024];
532    ///
533    /// // Write the 1 GiB buffer in chunks up to 8 MiB each.
534    /// file.write_all(&mut buf).await?;
535    /// # Ok(())
536    /// # }
537    /// ```
538    pub fn set_max_buf_size(&mut self, max_buf_size: usize) {
539        self.max_buf_size = max_buf_size;
540    }
541}
542
543impl AsyncRead for File {
544    fn poll_read(
545        self: Pin<&mut Self>,
546        cx: &mut Context<'_>,
547        dst: &mut ReadBuf<'_>,
548    ) -> Poll<io::Result<()>> {
549        ready!(crate::trace::trace_leaf(cx));
550        let me = self.get_mut();
551        let inner = me.inner.get_mut();
552
553        loop {
554            match inner.state {
555                State::Idle(ref mut buf_cell) => {
556                    let mut buf = buf_cell.take().unwrap();
557
558                    if !buf.is_empty() {
559                        buf.copy_to(dst);
560                        *buf_cell = Some(buf);
561                        return Poll::Ready(Ok(()));
562                    }
563
564                    buf.ensure_capacity_for(dst, me.max_buf_size);
565                    let std = me.std.clone();
566
567                    inner.state = State::Busy(spawn_blocking(move || {
568                        let res = buf.read_from(&mut &*std);
569                        (Operation::Read(res), buf)
570                    }));
571                }
572                State::Busy(ref mut rx) => {
573                    let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?;
574
575                    match op {
576                        Operation::Read(Ok(_)) => {
577                            buf.copy_to(dst);
578                            inner.state = State::Idle(Some(buf));
579                            return Poll::Ready(Ok(()));
580                        }
581                        Operation::Read(Err(e)) => {
582                            assert!(buf.is_empty());
583
584                            inner.state = State::Idle(Some(buf));
585                            return Poll::Ready(Err(e));
586                        }
587                        Operation::Write(Ok(())) => {
588                            assert!(buf.is_empty());
589                            inner.state = State::Idle(Some(buf));
590                            continue;
591                        }
592                        Operation::Write(Err(e)) => {
593                            assert!(inner.last_write_err.is_none());
594                            inner.last_write_err = Some(e.kind());
595                            inner.state = State::Idle(Some(buf));
596                        }
597                        Operation::Seek(result) => {
598                            assert!(buf.is_empty());
599                            inner.state = State::Idle(Some(buf));
600                            if let Ok(pos) = result {
601                                inner.pos = pos;
602                            }
603                            continue;
604                        }
605                    }
606                }
607            }
608        }
609    }
610}
611
612impl AsyncSeek for File {
613    fn start_seek(self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()> {
614        let me = self.get_mut();
615        let inner = me.inner.get_mut();
616
617        match inner.state {
618            State::Busy(_) => Err(io::Error::new(
619                io::ErrorKind::Other,
620                "other file operation is pending, call poll_complete before start_seek",
621            )),
622            State::Idle(ref mut buf_cell) => {
623                let mut buf = buf_cell.take().unwrap();
624
625                // Factor in any unread data from the buf
626                if !buf.is_empty() {
627                    let n = buf.discard_read();
628
629                    if let SeekFrom::Current(ref mut offset) = pos {
630                        *offset += n;
631                    }
632                }
633
634                let std = me.std.clone();
635
636                inner.state = State::Busy(spawn_blocking(move || {
637                    let res = (&*std).seek(pos);
638                    (Operation::Seek(res), buf)
639                }));
640                Ok(())
641            }
642        }
643    }
644
645    fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
646        ready!(crate::trace::trace_leaf(cx));
647        let inner = self.inner.get_mut();
648
649        loop {
650            match inner.state {
651                State::Idle(_) => return Poll::Ready(Ok(inner.pos)),
652                State::Busy(ref mut rx) => {
653                    let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
654                    inner.state = State::Idle(Some(buf));
655
656                    match op {
657                        Operation::Read(_) => {}
658                        Operation::Write(Err(e)) => {
659                            assert!(inner.last_write_err.is_none());
660                            inner.last_write_err = Some(e.kind());
661                        }
662                        Operation::Write(_) => {}
663                        Operation::Seek(res) => {
664                            if let Ok(pos) = res {
665                                inner.pos = pos;
666                            }
667                            return Poll::Ready(res);
668                        }
669                    }
670                }
671            }
672        }
673    }
674}
675
676impl AsyncWrite for File {
677    fn poll_write(
678        self: Pin<&mut Self>,
679        cx: &mut Context<'_>,
680        src: &[u8],
681    ) -> Poll<io::Result<usize>> {
682        ready!(crate::trace::trace_leaf(cx));
683        let me = self.get_mut();
684        let inner = me.inner.get_mut();
685
686        if let Some(e) = inner.last_write_err.take() {
687            return Poll::Ready(Err(e.into()));
688        }
689
690        loop {
691            match inner.state {
692                State::Idle(ref mut buf_cell) => {
693                    let mut buf = buf_cell.take().unwrap();
694
695                    let seek = if !buf.is_empty() {
696                        Some(SeekFrom::Current(buf.discard_read()))
697                    } else {
698                        None
699                    };
700
701                    let n = buf.copy_from(src, me.max_buf_size);
702                    let std = me.std.clone();
703
704                    let blocking_task_join_handle = spawn_mandatory_blocking(move || {
705                        let res = if let Some(seek) = seek {
706                            (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
707                        } else {
708                            buf.write_to(&mut &*std)
709                        };
710
711                        (Operation::Write(res), buf)
712                    })
713                    .ok_or_else(|| {
714                        io::Error::new(io::ErrorKind::Other, "background task failed")
715                    })?;
716
717                    inner.state = State::Busy(blocking_task_join_handle);
718
719                    return Poll::Ready(Ok(n));
720                }
721                State::Busy(ref mut rx) => {
722                    let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
723                    inner.state = State::Idle(Some(buf));
724
725                    match op {
726                        Operation::Read(_) => {
727                            // We don't care about the result here. The fact
728                            // that the cursor has advanced will be reflected in
729                            // the next iteration of the loop
730                            continue;
731                        }
732                        Operation::Write(res) => {
733                            // If the previous write was successful, continue.
734                            // Otherwise, error.
735                            res?;
736                            continue;
737                        }
738                        Operation::Seek(_) => {
739                            // Ignore the seek
740                            continue;
741                        }
742                    }
743                }
744            }
745        }
746    }
747
748    fn poll_write_vectored(
749        self: Pin<&mut Self>,
750        cx: &mut Context<'_>,
751        bufs: &[io::IoSlice<'_>],
752    ) -> Poll<Result<usize, io::Error>> {
753        ready!(crate::trace::trace_leaf(cx));
754        let me = self.get_mut();
755        let inner = me.inner.get_mut();
756
757        if let Some(e) = inner.last_write_err.take() {
758            return Poll::Ready(Err(e.into()));
759        }
760
761        loop {
762            match inner.state {
763                State::Idle(ref mut buf_cell) => {
764                    let mut buf = buf_cell.take().unwrap();
765
766                    let seek = if !buf.is_empty() {
767                        Some(SeekFrom::Current(buf.discard_read()))
768                    } else {
769                        None
770                    };
771
772                    let n = buf.copy_from_bufs(bufs, me.max_buf_size);
773                    let std = me.std.clone();
774
775                    let blocking_task_join_handle = spawn_mandatory_blocking(move || {
776                        let res = if let Some(seek) = seek {
777                            (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
778                        } else {
779                            buf.write_to(&mut &*std)
780                        };
781
782                        (Operation::Write(res), buf)
783                    })
784                    .ok_or_else(|| {
785                        io::Error::new(io::ErrorKind::Other, "background task failed")
786                    })?;
787
788                    inner.state = State::Busy(blocking_task_join_handle);
789
790                    return Poll::Ready(Ok(n));
791                }
792                State::Busy(ref mut rx) => {
793                    let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
794                    inner.state = State::Idle(Some(buf));
795
796                    match op {
797                        Operation::Read(_) => {
798                            // We don't care about the result here. The fact
799                            // that the cursor has advanced will be reflected in
800                            // the next iteration of the loop
801                            continue;
802                        }
803                        Operation::Write(res) => {
804                            // If the previous write was successful, continue.
805                            // Otherwise, error.
806                            res?;
807                            continue;
808                        }
809                        Operation::Seek(_) => {
810                            // Ignore the seek
811                            continue;
812                        }
813                    }
814                }
815            }
816        }
817    }
818
819    fn is_write_vectored(&self) -> bool {
820        true
821    }
822
823    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
824        ready!(crate::trace::trace_leaf(cx));
825        let inner = self.inner.get_mut();
826        inner.poll_flush(cx)
827    }
828
829    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
830        ready!(crate::trace::trace_leaf(cx));
831        self.poll_flush(cx)
832    }
833}
834
835impl From<StdFile> for File {
836    fn from(std: StdFile) -> Self {
837        Self::from_std(std)
838    }
839}
840
841impl fmt::Debug for File {
842    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
843        fmt.debug_struct("tokio::fs::File")
844            .field("std", &self.std)
845            .finish()
846    }
847}
848
849#[cfg(unix)]
850impl std::os::unix::io::AsRawFd for File {
851    fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
852        self.std.as_raw_fd()
853    }
854}
855
856#[cfg(unix)]
857impl std::os::unix::io::AsFd for File {
858    fn as_fd(&self) -> std::os::unix::io::BorrowedFd<'_> {
859        unsafe {
860            std::os::unix::io::BorrowedFd::borrow_raw(std::os::unix::io::AsRawFd::as_raw_fd(self))
861        }
862    }
863}
864
865#[cfg(unix)]
866impl std::os::unix::io::FromRawFd for File {
867    unsafe fn from_raw_fd(fd: std::os::unix::io::RawFd) -> Self {
868        StdFile::from_raw_fd(fd).into()
869    }
870}
871
872cfg_windows! {
873    use crate::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle, AsHandle, BorrowedHandle};
874
875    impl AsRawHandle for File {
876        fn as_raw_handle(&self) -> RawHandle {
877            self.std.as_raw_handle()
878        }
879    }
880
881    impl AsHandle for File {
882        fn as_handle(&self) -> BorrowedHandle<'_> {
883            unsafe {
884                BorrowedHandle::borrow_raw(
885                    AsRawHandle::as_raw_handle(self),
886                )
887            }
888        }
889    }
890
891    impl FromRawHandle for File {
892        unsafe fn from_raw_handle(handle: RawHandle) -> Self {
893            StdFile::from_raw_handle(handle).into()
894        }
895    }
896}
897
898impl Inner {
899    async fn complete_inflight(&mut self) {
900        use crate::future::poll_fn;
901
902        poll_fn(|cx| self.poll_complete_inflight(cx)).await;
903    }
904
905    fn poll_complete_inflight(&mut self, cx: &mut Context<'_>) -> Poll<()> {
906        ready!(crate::trace::trace_leaf(cx));
907        match self.poll_flush(cx) {
908            Poll::Ready(Err(e)) => {
909                self.last_write_err = Some(e.kind());
910                Poll::Ready(())
911            }
912            Poll::Ready(Ok(())) => Poll::Ready(()),
913            Poll::Pending => Poll::Pending,
914        }
915    }
916
917    fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
918        if let Some(e) = self.last_write_err.take() {
919            return Poll::Ready(Err(e.into()));
920        }
921
922        let (op, buf) = match self.state {
923            State::Idle(_) => return Poll::Ready(Ok(())),
924            State::Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?,
925        };
926
927        // The buffer is not used here
928        self.state = State::Idle(Some(buf));
929
930        match op {
931            Operation::Read(_) => Poll::Ready(Ok(())),
932            Operation::Write(res) => Poll::Ready(res),
933            Operation::Seek(_) => Poll::Ready(Ok(())),
934        }
935    }
936}
937
938#[cfg(test)]
939mod tests;