madsim_real_tokio/fs/file.rs
1//! Types for working with [`File`].
2//!
3//! [`File`]: File
4
5use crate::fs::{asyncify, OpenOptions};
6use crate::io::blocking::{Buf, DEFAULT_MAX_BUF_SIZE};
7use crate::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
8use crate::sync::Mutex;
9
10use std::fmt;
11use std::fs::{Metadata, Permissions};
12use std::future::Future;
13use std::io::{self, Seek, SeekFrom};
14use std::path::Path;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::Context;
18use std::task::Poll;
19
20#[cfg(test)]
21use super::mocks::JoinHandle;
22#[cfg(test)]
23use super::mocks::MockFile as StdFile;
24#[cfg(test)]
25use super::mocks::{spawn_blocking, spawn_mandatory_blocking};
26#[cfg(not(test))]
27use crate::blocking::JoinHandle;
28#[cfg(not(test))]
29use crate::blocking::{spawn_blocking, spawn_mandatory_blocking};
30#[cfg(not(test))]
31use std::fs::File as StdFile;
32
33/// A reference to an open file on the filesystem.
34///
35/// This is a specialized version of [`std::fs::File`] for usage from the
36/// Tokio runtime.
37///
38/// An instance of a `File` can be read and/or written depending on what options
39/// it was opened with. Files also implement [`AsyncSeek`] to alter the logical
40/// cursor that the file contains internally.
41///
42/// A file will not be closed immediately when it goes out of scope if there
43/// are any IO operations that have not yet completed. To ensure that a file is
44/// closed immediately when it is dropped, you should call [`flush`] before
45/// dropping it. Note that this does not ensure that the file has been fully
46/// written to disk; the operating system might keep the changes around in an
47/// in-memory buffer. See the [`sync_all`] method for telling the OS to write
48/// the data to disk.
49///
50/// Reading and writing to a `File` is usually done using the convenience
51/// methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] traits.
52///
53/// [`AsyncSeek`]: trait@crate::io::AsyncSeek
54/// [`flush`]: fn@crate::io::AsyncWriteExt::flush
55/// [`sync_all`]: fn@crate::fs::File::sync_all
56/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
57/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
58///
59/// # Examples
60///
61/// Create a new file and asynchronously write bytes to it:
62///
63/// ```no_run
64/// use tokio::fs::File;
65/// use tokio::io::AsyncWriteExt; // for write_all()
66///
67/// # async fn dox() -> std::io::Result<()> {
68/// let mut file = File::create("foo.txt").await?;
69/// file.write_all(b"hello, world!").await?;
70/// # Ok(())
71/// # }
72/// ```
73///
74/// Read the contents of a file into a buffer:
75///
76/// ```no_run
77/// use tokio::fs::File;
78/// use tokio::io::AsyncReadExt; // for read_to_end()
79///
80/// # async fn dox() -> std::io::Result<()> {
81/// let mut file = File::open("foo.txt").await?;
82///
83/// let mut contents = vec![];
84/// file.read_to_end(&mut contents).await?;
85///
86/// println!("len = {}", contents.len());
87/// # Ok(())
88/// # }
89/// ```
90pub struct File {
91 std: Arc<StdFile>,
92 inner: Mutex<Inner>,
93 max_buf_size: usize,
94}
95
96struct Inner {
97 state: State,
98
99 /// Errors from writes/flushes are returned in write/flush calls. If a write
100 /// error is observed while performing a read, it is saved until the next
101 /// write / flush call.
102 last_write_err: Option<io::ErrorKind>,
103
104 pos: u64,
105}
106
107#[derive(Debug)]
108enum State {
109 Idle(Option<Buf>),
110 Busy(JoinHandle<(Operation, Buf)>),
111}
112
113#[derive(Debug)]
114enum Operation {
115 Read(io::Result<usize>),
116 Write(io::Result<()>),
117 Seek(io::Result<u64>),
118}
119
120impl File {
121 /// Attempts to open a file in read-only mode.
122 ///
123 /// See [`OpenOptions`] for more details.
124 ///
125 /// # Errors
126 ///
127 /// This function will return an error if called from outside of the Tokio
128 /// runtime or if path does not already exist. Other errors may also be
129 /// returned according to `OpenOptions::open`.
130 ///
131 /// # Examples
132 ///
133 /// ```no_run
134 /// use tokio::fs::File;
135 /// use tokio::io::AsyncReadExt;
136 ///
137 /// # async fn dox() -> std::io::Result<()> {
138 /// let mut file = File::open("foo.txt").await?;
139 ///
140 /// let mut contents = vec![];
141 /// file.read_to_end(&mut contents).await?;
142 ///
143 /// println!("len = {}", contents.len());
144 /// # Ok(())
145 /// # }
146 /// ```
147 ///
148 /// The [`read_to_end`] method is defined on the [`AsyncReadExt`] trait.
149 ///
150 /// [`read_to_end`]: fn@crate::io::AsyncReadExt::read_to_end
151 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
152 pub async fn open(path: impl AsRef<Path>) -> io::Result<File> {
153 let path = path.as_ref().to_owned();
154 let std = asyncify(|| StdFile::open(path)).await?;
155
156 Ok(File::from_std(std))
157 }
158
159 /// Opens a file in write-only mode.
160 ///
161 /// This function will create a file if it does not exist, and will truncate
162 /// it if it does.
163 ///
164 /// See [`OpenOptions`] for more details.
165 ///
166 /// # Errors
167 ///
168 /// Results in an error if called from outside of the Tokio runtime or if
169 /// the underlying [`create`] call results in an error.
170 ///
171 /// [`create`]: std::fs::File::create
172 ///
173 /// # Examples
174 ///
175 /// ```no_run
176 /// use tokio::fs::File;
177 /// use tokio::io::AsyncWriteExt;
178 ///
179 /// # async fn dox() -> std::io::Result<()> {
180 /// let mut file = File::create("foo.txt").await?;
181 /// file.write_all(b"hello, world!").await?;
182 /// # Ok(())
183 /// # }
184 /// ```
185 ///
186 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
187 ///
188 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
189 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
190 pub async fn create(path: impl AsRef<Path>) -> io::Result<File> {
191 let path = path.as_ref().to_owned();
192 let std_file = asyncify(move || StdFile::create(path)).await?;
193 Ok(File::from_std(std_file))
194 }
195
196 /// Returns a new [`OpenOptions`] object.
197 ///
198 /// This function returns a new `OpenOptions` object that you can use to
199 /// open or create a file with specific options if `open()` or `create()`
200 /// are not appropriate.
201 ///
202 /// It is equivalent to `OpenOptions::new()`, but allows you to write more
203 /// readable code. Instead of
204 /// `OpenOptions::new().append(true).open("example.log")`,
205 /// you can write `File::options().append(true).open("example.log")`. This
206 /// also avoids the need to import `OpenOptions`.
207 ///
208 /// See the [`OpenOptions::new`] function for more details.
209 ///
210 /// # Examples
211 ///
212 /// ```no_run
213 /// use tokio::fs::File;
214 /// use tokio::io::AsyncWriteExt;
215 ///
216 /// # async fn dox() -> std::io::Result<()> {
217 /// let mut f = File::options().append(true).open("example.log").await?;
218 /// f.write_all(b"new line\n").await?;
219 /// # Ok(())
220 /// # }
221 /// ```
222 #[must_use]
223 pub fn options() -> OpenOptions {
224 OpenOptions::new()
225 }
226
227 /// Converts a [`std::fs::File`] to a [`tokio::fs::File`](File).
228 ///
229 /// # Examples
230 ///
231 /// ```no_run
232 /// // This line could block. It is not recommended to do this on the Tokio
233 /// // runtime.
234 /// let std_file = std::fs::File::open("foo.txt").unwrap();
235 /// let file = tokio::fs::File::from_std(std_file);
236 /// ```
237 pub fn from_std(std: StdFile) -> File {
238 File {
239 std: Arc::new(std),
240 inner: Mutex::new(Inner {
241 state: State::Idle(Some(Buf::with_capacity(0))),
242 last_write_err: None,
243 pos: 0,
244 }),
245 max_buf_size: DEFAULT_MAX_BUF_SIZE,
246 }
247 }
248
249 /// Attempts to sync all OS-internal metadata to disk.
250 ///
251 /// This function will attempt to ensure that all in-core data reaches the
252 /// filesystem before returning.
253 ///
254 /// # Examples
255 ///
256 /// ```no_run
257 /// use tokio::fs::File;
258 /// use tokio::io::AsyncWriteExt;
259 ///
260 /// # async fn dox() -> std::io::Result<()> {
261 /// let mut file = File::create("foo.txt").await?;
262 /// file.write_all(b"hello, world!").await?;
263 /// file.sync_all().await?;
264 /// # Ok(())
265 /// # }
266 /// ```
267 ///
268 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
269 ///
270 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
271 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
272 pub async fn sync_all(&self) -> io::Result<()> {
273 let mut inner = self.inner.lock().await;
274 inner.complete_inflight().await;
275
276 let std = self.std.clone();
277 asyncify(move || std.sync_all()).await
278 }
279
280 /// This function is similar to `sync_all`, except that it may not
281 /// synchronize file metadata to the filesystem.
282 ///
283 /// This is intended for use cases that must synchronize content, but don't
284 /// need the metadata on disk. The goal of this method is to reduce disk
285 /// operations.
286 ///
287 /// Note that some platforms may simply implement this in terms of `sync_all`.
288 ///
289 /// # Examples
290 ///
291 /// ```no_run
292 /// use tokio::fs::File;
293 /// use tokio::io::AsyncWriteExt;
294 ///
295 /// # async fn dox() -> std::io::Result<()> {
296 /// let mut file = File::create("foo.txt").await?;
297 /// file.write_all(b"hello, world!").await?;
298 /// file.sync_data().await?;
299 /// # Ok(())
300 /// # }
301 /// ```
302 ///
303 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
304 ///
305 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
306 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
307 pub async fn sync_data(&self) -> io::Result<()> {
308 let mut inner = self.inner.lock().await;
309 inner.complete_inflight().await;
310
311 let std = self.std.clone();
312 asyncify(move || std.sync_data()).await
313 }
314
315 /// Truncates or extends the underlying file, updating the size of this file to become size.
316 ///
317 /// If the size is less than the current file's size, then the file will be
318 /// shrunk. If it is greater than the current file's size, then the file
319 /// will be extended to size and have all of the intermediate data filled in
320 /// with 0s.
321 ///
322 /// # Errors
323 ///
324 /// This function will return an error if the file is not opened for
325 /// writing.
326 ///
327 /// # Examples
328 ///
329 /// ```no_run
330 /// use tokio::fs::File;
331 /// use tokio::io::AsyncWriteExt;
332 ///
333 /// # async fn dox() -> std::io::Result<()> {
334 /// let mut file = File::create("foo.txt").await?;
335 /// file.write_all(b"hello, world!").await?;
336 /// file.set_len(10).await?;
337 /// # Ok(())
338 /// # }
339 /// ```
340 ///
341 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
342 ///
343 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
344 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
345 pub async fn set_len(&self, size: u64) -> io::Result<()> {
346 let mut inner = self.inner.lock().await;
347 inner.complete_inflight().await;
348
349 let mut buf = match inner.state {
350 State::Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
351 _ => unreachable!(),
352 };
353
354 let seek = if !buf.is_empty() {
355 Some(SeekFrom::Current(buf.discard_read()))
356 } else {
357 None
358 };
359
360 let std = self.std.clone();
361
362 inner.state = State::Busy(spawn_blocking(move || {
363 let res = if let Some(seek) = seek {
364 (&*std).seek(seek).and_then(|_| std.set_len(size))
365 } else {
366 std.set_len(size)
367 }
368 .map(|()| 0); // the value is discarded later
369
370 // Return the result as a seek
371 (Operation::Seek(res), buf)
372 }));
373
374 let (op, buf) = match inner.state {
375 State::Idle(_) => unreachable!(),
376 State::Busy(ref mut rx) => rx.await?,
377 };
378
379 inner.state = State::Idle(Some(buf));
380
381 match op {
382 Operation::Seek(res) => res.map(|pos| {
383 inner.pos = pos;
384 }),
385 _ => unreachable!(),
386 }
387 }
388
389 /// Queries metadata about the underlying file.
390 ///
391 /// # Examples
392 ///
393 /// ```no_run
394 /// use tokio::fs::File;
395 ///
396 /// # async fn dox() -> std::io::Result<()> {
397 /// let file = File::open("foo.txt").await?;
398 /// let metadata = file.metadata().await?;
399 ///
400 /// println!("{:?}", metadata);
401 /// # Ok(())
402 /// # }
403 /// ```
404 pub async fn metadata(&self) -> io::Result<Metadata> {
405 let std = self.std.clone();
406 asyncify(move || std.metadata()).await
407 }
408
409 /// Creates a new `File` instance that shares the same underlying file handle
410 /// as the existing `File` instance. Reads, writes, and seeks will affect both
411 /// File instances simultaneously.
412 ///
413 /// # Examples
414 ///
415 /// ```no_run
416 /// use tokio::fs::File;
417 ///
418 /// # async fn dox() -> std::io::Result<()> {
419 /// let file = File::open("foo.txt").await?;
420 /// let file_clone = file.try_clone().await?;
421 /// # Ok(())
422 /// # }
423 /// ```
424 pub async fn try_clone(&self) -> io::Result<File> {
425 self.inner.lock().await.complete_inflight().await;
426 let std = self.std.clone();
427 let std_file = asyncify(move || std.try_clone()).await?;
428 Ok(File::from_std(std_file))
429 }
430
431 /// Destructures `File` into a [`std::fs::File`]. This function is
432 /// async to allow any in-flight operations to complete.
433 ///
434 /// Use `File::try_into_std` to attempt conversion immediately.
435 ///
436 /// # Examples
437 ///
438 /// ```no_run
439 /// use tokio::fs::File;
440 ///
441 /// # async fn dox() -> std::io::Result<()> {
442 /// let tokio_file = File::open("foo.txt").await?;
443 /// let std_file = tokio_file.into_std().await;
444 /// # Ok(())
445 /// # }
446 /// ```
447 pub async fn into_std(mut self) -> StdFile {
448 self.inner.get_mut().complete_inflight().await;
449 Arc::try_unwrap(self.std).expect("Arc::try_unwrap failed")
450 }
451
452 /// Tries to immediately destructure `File` into a [`std::fs::File`].
453 ///
454 /// # Errors
455 ///
456 /// This function will return an error containing the file if some
457 /// operation is in-flight.
458 ///
459 /// # Examples
460 ///
461 /// ```no_run
462 /// use tokio::fs::File;
463 ///
464 /// # async fn dox() -> std::io::Result<()> {
465 /// let tokio_file = File::open("foo.txt").await?;
466 /// let std_file = tokio_file.try_into_std().unwrap();
467 /// # Ok(())
468 /// # }
469 /// ```
470 pub fn try_into_std(mut self) -> Result<StdFile, Self> {
471 match Arc::try_unwrap(self.std) {
472 Ok(file) => Ok(file),
473 Err(std_file_arc) => {
474 self.std = std_file_arc;
475 Err(self)
476 }
477 }
478 }
479
480 /// Changes the permissions on the underlying file.
481 ///
482 /// # Platform-specific behavior
483 ///
484 /// This function currently corresponds to the `fchmod` function on Unix and
485 /// the `SetFileInformationByHandle` function on Windows. Note that, this
486 /// [may change in the future][changes].
487 ///
488 /// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior
489 ///
490 /// # Errors
491 ///
492 /// This function will return an error if the user lacks permission change
493 /// attributes on the underlying file. It may also return an error in other
494 /// os-specific unspecified cases.
495 ///
496 /// # Examples
497 ///
498 /// ```no_run
499 /// use tokio::fs::File;
500 ///
501 /// # async fn dox() -> std::io::Result<()> {
502 /// let file = File::open("foo.txt").await?;
503 /// let mut perms = file.metadata().await?.permissions();
504 /// perms.set_readonly(true);
505 /// file.set_permissions(perms).await?;
506 /// # Ok(())
507 /// # }
508 /// ```
509 pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> {
510 let std = self.std.clone();
511 asyncify(move || std.set_permissions(perm)).await
512 }
513
514 /// Set the maximum buffer size for the underlying [`AsyncRead`] / [`AsyncWrite`] operation.
515 ///
516 /// Although Tokio uses a sensible default value for this buffer size, this function would be
517 /// useful for changing that default depending on the situation.
518 ///
519 /// # Examples
520 ///
521 /// ```no_run
522 /// use tokio::fs::File;
523 /// use tokio::io::AsyncWriteExt;
524 ///
525 /// # async fn dox() -> std::io::Result<()> {
526 /// let mut file = File::open("foo.txt").await?;
527 ///
528 /// // Set maximum buffer size to 8 MiB
529 /// file.set_max_buf_size(8 * 1024 * 1024);
530 ///
531 /// let mut buf = vec![1; 1024 * 1024 * 1024];
532 ///
533 /// // Write the 1 GiB buffer in chunks up to 8 MiB each.
534 /// file.write_all(&mut buf).await?;
535 /// # Ok(())
536 /// # }
537 /// ```
538 pub fn set_max_buf_size(&mut self, max_buf_size: usize) {
539 self.max_buf_size = max_buf_size;
540 }
541}
542
543impl AsyncRead for File {
544 fn poll_read(
545 self: Pin<&mut Self>,
546 cx: &mut Context<'_>,
547 dst: &mut ReadBuf<'_>,
548 ) -> Poll<io::Result<()>> {
549 ready!(crate::trace::trace_leaf(cx));
550 let me = self.get_mut();
551 let inner = me.inner.get_mut();
552
553 loop {
554 match inner.state {
555 State::Idle(ref mut buf_cell) => {
556 let mut buf = buf_cell.take().unwrap();
557
558 if !buf.is_empty() {
559 buf.copy_to(dst);
560 *buf_cell = Some(buf);
561 return Poll::Ready(Ok(()));
562 }
563
564 buf.ensure_capacity_for(dst, me.max_buf_size);
565 let std = me.std.clone();
566
567 inner.state = State::Busy(spawn_blocking(move || {
568 let res = buf.read_from(&mut &*std);
569 (Operation::Read(res), buf)
570 }));
571 }
572 State::Busy(ref mut rx) => {
573 let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?;
574
575 match op {
576 Operation::Read(Ok(_)) => {
577 buf.copy_to(dst);
578 inner.state = State::Idle(Some(buf));
579 return Poll::Ready(Ok(()));
580 }
581 Operation::Read(Err(e)) => {
582 assert!(buf.is_empty());
583
584 inner.state = State::Idle(Some(buf));
585 return Poll::Ready(Err(e));
586 }
587 Operation::Write(Ok(())) => {
588 assert!(buf.is_empty());
589 inner.state = State::Idle(Some(buf));
590 continue;
591 }
592 Operation::Write(Err(e)) => {
593 assert!(inner.last_write_err.is_none());
594 inner.last_write_err = Some(e.kind());
595 inner.state = State::Idle(Some(buf));
596 }
597 Operation::Seek(result) => {
598 assert!(buf.is_empty());
599 inner.state = State::Idle(Some(buf));
600 if let Ok(pos) = result {
601 inner.pos = pos;
602 }
603 continue;
604 }
605 }
606 }
607 }
608 }
609 }
610}
611
612impl AsyncSeek for File {
613 fn start_seek(self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()> {
614 let me = self.get_mut();
615 let inner = me.inner.get_mut();
616
617 match inner.state {
618 State::Busy(_) => Err(io::Error::new(
619 io::ErrorKind::Other,
620 "other file operation is pending, call poll_complete before start_seek",
621 )),
622 State::Idle(ref mut buf_cell) => {
623 let mut buf = buf_cell.take().unwrap();
624
625 // Factor in any unread data from the buf
626 if !buf.is_empty() {
627 let n = buf.discard_read();
628
629 if let SeekFrom::Current(ref mut offset) = pos {
630 *offset += n;
631 }
632 }
633
634 let std = me.std.clone();
635
636 inner.state = State::Busy(spawn_blocking(move || {
637 let res = (&*std).seek(pos);
638 (Operation::Seek(res), buf)
639 }));
640 Ok(())
641 }
642 }
643 }
644
645 fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
646 ready!(crate::trace::trace_leaf(cx));
647 let inner = self.inner.get_mut();
648
649 loop {
650 match inner.state {
651 State::Idle(_) => return Poll::Ready(Ok(inner.pos)),
652 State::Busy(ref mut rx) => {
653 let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
654 inner.state = State::Idle(Some(buf));
655
656 match op {
657 Operation::Read(_) => {}
658 Operation::Write(Err(e)) => {
659 assert!(inner.last_write_err.is_none());
660 inner.last_write_err = Some(e.kind());
661 }
662 Operation::Write(_) => {}
663 Operation::Seek(res) => {
664 if let Ok(pos) = res {
665 inner.pos = pos;
666 }
667 return Poll::Ready(res);
668 }
669 }
670 }
671 }
672 }
673 }
674}
675
676impl AsyncWrite for File {
677 fn poll_write(
678 self: Pin<&mut Self>,
679 cx: &mut Context<'_>,
680 src: &[u8],
681 ) -> Poll<io::Result<usize>> {
682 ready!(crate::trace::trace_leaf(cx));
683 let me = self.get_mut();
684 let inner = me.inner.get_mut();
685
686 if let Some(e) = inner.last_write_err.take() {
687 return Poll::Ready(Err(e.into()));
688 }
689
690 loop {
691 match inner.state {
692 State::Idle(ref mut buf_cell) => {
693 let mut buf = buf_cell.take().unwrap();
694
695 let seek = if !buf.is_empty() {
696 Some(SeekFrom::Current(buf.discard_read()))
697 } else {
698 None
699 };
700
701 let n = buf.copy_from(src, me.max_buf_size);
702 let std = me.std.clone();
703
704 let blocking_task_join_handle = spawn_mandatory_blocking(move || {
705 let res = if let Some(seek) = seek {
706 (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
707 } else {
708 buf.write_to(&mut &*std)
709 };
710
711 (Operation::Write(res), buf)
712 })
713 .ok_or_else(|| {
714 io::Error::new(io::ErrorKind::Other, "background task failed")
715 })?;
716
717 inner.state = State::Busy(blocking_task_join_handle);
718
719 return Poll::Ready(Ok(n));
720 }
721 State::Busy(ref mut rx) => {
722 let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
723 inner.state = State::Idle(Some(buf));
724
725 match op {
726 Operation::Read(_) => {
727 // We don't care about the result here. The fact
728 // that the cursor has advanced will be reflected in
729 // the next iteration of the loop
730 continue;
731 }
732 Operation::Write(res) => {
733 // If the previous write was successful, continue.
734 // Otherwise, error.
735 res?;
736 continue;
737 }
738 Operation::Seek(_) => {
739 // Ignore the seek
740 continue;
741 }
742 }
743 }
744 }
745 }
746 }
747
748 fn poll_write_vectored(
749 self: Pin<&mut Self>,
750 cx: &mut Context<'_>,
751 bufs: &[io::IoSlice<'_>],
752 ) -> Poll<Result<usize, io::Error>> {
753 ready!(crate::trace::trace_leaf(cx));
754 let me = self.get_mut();
755 let inner = me.inner.get_mut();
756
757 if let Some(e) = inner.last_write_err.take() {
758 return Poll::Ready(Err(e.into()));
759 }
760
761 loop {
762 match inner.state {
763 State::Idle(ref mut buf_cell) => {
764 let mut buf = buf_cell.take().unwrap();
765
766 let seek = if !buf.is_empty() {
767 Some(SeekFrom::Current(buf.discard_read()))
768 } else {
769 None
770 };
771
772 let n = buf.copy_from_bufs(bufs, me.max_buf_size);
773 let std = me.std.clone();
774
775 let blocking_task_join_handle = spawn_mandatory_blocking(move || {
776 let res = if let Some(seek) = seek {
777 (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
778 } else {
779 buf.write_to(&mut &*std)
780 };
781
782 (Operation::Write(res), buf)
783 })
784 .ok_or_else(|| {
785 io::Error::new(io::ErrorKind::Other, "background task failed")
786 })?;
787
788 inner.state = State::Busy(blocking_task_join_handle);
789
790 return Poll::Ready(Ok(n));
791 }
792 State::Busy(ref mut rx) => {
793 let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
794 inner.state = State::Idle(Some(buf));
795
796 match op {
797 Operation::Read(_) => {
798 // We don't care about the result here. The fact
799 // that the cursor has advanced will be reflected in
800 // the next iteration of the loop
801 continue;
802 }
803 Operation::Write(res) => {
804 // If the previous write was successful, continue.
805 // Otherwise, error.
806 res?;
807 continue;
808 }
809 Operation::Seek(_) => {
810 // Ignore the seek
811 continue;
812 }
813 }
814 }
815 }
816 }
817 }
818
819 fn is_write_vectored(&self) -> bool {
820 true
821 }
822
823 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
824 ready!(crate::trace::trace_leaf(cx));
825 let inner = self.inner.get_mut();
826 inner.poll_flush(cx)
827 }
828
829 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
830 ready!(crate::trace::trace_leaf(cx));
831 self.poll_flush(cx)
832 }
833}
834
835impl From<StdFile> for File {
836 fn from(std: StdFile) -> Self {
837 Self::from_std(std)
838 }
839}
840
841impl fmt::Debug for File {
842 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
843 fmt.debug_struct("tokio::fs::File")
844 .field("std", &self.std)
845 .finish()
846 }
847}
848
849#[cfg(unix)]
850impl std::os::unix::io::AsRawFd for File {
851 fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
852 self.std.as_raw_fd()
853 }
854}
855
856#[cfg(unix)]
857impl std::os::unix::io::AsFd for File {
858 fn as_fd(&self) -> std::os::unix::io::BorrowedFd<'_> {
859 unsafe {
860 std::os::unix::io::BorrowedFd::borrow_raw(std::os::unix::io::AsRawFd::as_raw_fd(self))
861 }
862 }
863}
864
865#[cfg(unix)]
866impl std::os::unix::io::FromRawFd for File {
867 unsafe fn from_raw_fd(fd: std::os::unix::io::RawFd) -> Self {
868 StdFile::from_raw_fd(fd).into()
869 }
870}
871
872cfg_windows! {
873 use crate::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle, AsHandle, BorrowedHandle};
874
875 impl AsRawHandle for File {
876 fn as_raw_handle(&self) -> RawHandle {
877 self.std.as_raw_handle()
878 }
879 }
880
881 impl AsHandle for File {
882 fn as_handle(&self) -> BorrowedHandle<'_> {
883 unsafe {
884 BorrowedHandle::borrow_raw(
885 AsRawHandle::as_raw_handle(self),
886 )
887 }
888 }
889 }
890
891 impl FromRawHandle for File {
892 unsafe fn from_raw_handle(handle: RawHandle) -> Self {
893 StdFile::from_raw_handle(handle).into()
894 }
895 }
896}
897
898impl Inner {
899 async fn complete_inflight(&mut self) {
900 use crate::future::poll_fn;
901
902 poll_fn(|cx| self.poll_complete_inflight(cx)).await;
903 }
904
905 fn poll_complete_inflight(&mut self, cx: &mut Context<'_>) -> Poll<()> {
906 ready!(crate::trace::trace_leaf(cx));
907 match self.poll_flush(cx) {
908 Poll::Ready(Err(e)) => {
909 self.last_write_err = Some(e.kind());
910 Poll::Ready(())
911 }
912 Poll::Ready(Ok(())) => Poll::Ready(()),
913 Poll::Pending => Poll::Pending,
914 }
915 }
916
917 fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
918 if let Some(e) = self.last_write_err.take() {
919 return Poll::Ready(Err(e.into()));
920 }
921
922 let (op, buf) = match self.state {
923 State::Idle(_) => return Poll::Ready(Ok(())),
924 State::Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?,
925 };
926
927 // The buffer is not used here
928 self.state = State::Idle(Some(buf));
929
930 match op {
931 Operation::Read(_) => Poll::Ready(Ok(())),
932 Operation::Write(res) => Poll::Ready(res),
933 Operation::Seek(_) => Poll::Ready(Ok(())),
934 }
935 }
936}
937
938#[cfg(test)]
939mod tests;