async_std/fs/file.rs
1use std::cell::UnsafeCell;
2use std::cmp;
3use std::fmt;
4use std::io::{Read as _, Seek as _, Write as _};
5use std::ops::{Deref, DerefMut};
6use std::pin::Pin;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{Arc, Mutex};
9
10use crate::fs::{Metadata, Permissions};
11use crate::future;
12use crate::io::{self, Read, Seek, SeekFrom, Write};
13use crate::path::Path;
14use crate::prelude::*;
15use crate::task::{spawn_blocking, Context, Poll, Waker};
16use crate::utils::Context as _;
17
18const ARC_TRY_UNWRAP_EXPECT: &str = "cannot acquire ownership of the file handle after drop";
19
20/// An open file on the filesystem.
21///
22/// Depending on what options the file was opened with, this type can be used for reading and/or
23/// writing.
24///
25/// Files are automatically closed when they get dropped and any errors detected on closing are
26/// ignored. Use the [`sync_all`] method before dropping a file if such errors need to be handled.
27///
28/// This type is an async version of [`std::fs::File`].
29///
30/// [`sync_all`]: #method.sync_all
31/// [`std::fs::File`]: https://doc.rust-lang.org/std/fs/struct.File.html
32///
33/// # Examples
34///
35/// Create a new file and write some bytes to it:
36///
37/// ```no_run
38/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
39/// #
40/// use async_std::fs::File;
41/// use async_std::prelude::*;
42///
43/// let mut file = File::create("a.txt").await?;
44/// file.write_all(b"Hello, world!").await?;
45/// #
46/// # Ok(()) }) }
47/// ```
48///
49/// Read the contents of a file into a vector of bytes:
50///
51/// ```no_run
52/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
53/// #
54/// use async_std::fs::File;
55/// use async_std::prelude::*;
56///
57/// let mut file = File::open("a.txt").await?;
58/// let mut contents = Vec::new();
59/// file.read_to_end(&mut contents).await?;
60/// #
61/// # Ok(()) }) }
62/// ```
63#[derive(Clone)]
64pub struct File {
65 /// A reference to the inner file.
66 file: Arc<std::fs::File>,
67
68 /// The state of the file protected by an async lock.
69 lock: Lock<State>,
70}
71
72impl File {
73 /// Creates an async file handle.
74 pub(crate) fn new(file: std::fs::File, is_flushed: bool) -> File {
75 let file = Arc::new(file);
76
77 File {
78 file: file.clone(),
79 lock: Lock::new(State {
80 file,
81 mode: Mode::Idle,
82 cache: Vec::new(),
83 is_flushed,
84 last_read_err: None,
85 last_write_err: None,
86 }),
87 }
88 }
89
90 /// Opens a file in read-only mode.
91 ///
92 /// See the [`OpenOptions::open`] function for more options.
93 ///
94 /// # Errors
95 ///
96 /// An error will be returned in the following situations:
97 ///
98 /// * `path` does not point to an existing file.
99 /// * The current process lacks permissions to read the file.
100 /// * Some other I/O error occurred.
101 ///
102 /// For more details, see the list of errors documented by [`OpenOptions::open`].
103 ///
104 /// [`OpenOptions::open`]: struct.OpenOptions.html#method.open
105 ///
106 /// # Examples
107 ///
108 /// ```no_run
109 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
110 /// #
111 /// use async_std::fs::File;
112 ///
113 /// let file = File::open("a.txt").await?;
114 /// #
115 /// # Ok(()) }) }
116 /// ```
117 pub async fn open<P: AsRef<Path>>(path: P) -> io::Result<File> {
118 let path = path.as_ref().to_owned();
119 let file = spawn_blocking(move || {
120 std::fs::File::open(&path).context(|| format!("could not open `{}`", path.display()))
121 })
122 .await?;
123 Ok(File::new(file, true))
124 }
125
126 /// Opens a file in write-only mode.
127 ///
128 /// This function will create a file if it does not exist, and will truncate it if it does.
129 ///
130 /// See the [`OpenOptions::open`] function for more options.
131 ///
132 /// # Errors
133 ///
134 /// An error will be returned in the following situations:
135 ///
136 /// * The file's parent directory does not exist.
137 /// * The current process lacks permissions to write to the file.
138 /// * Some other I/O error occurred.
139 ///
140 /// For more details, see the list of errors documented by [`OpenOptions::open`].
141 ///
142 /// [`OpenOptions::open`]: struct.OpenOptions.html#method.open
143 ///
144 /// # Examples
145 ///
146 /// ```no_run
147 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
148 /// #
149 /// use async_std::fs::File;
150 ///
151 /// let file = File::create("a.txt").await?;
152 /// #
153 /// # Ok(()) }) }
154 /// ```
155 pub async fn create<P: AsRef<Path>>(path: P) -> io::Result<File> {
156 let path = path.as_ref().to_owned();
157 let file = spawn_blocking(move || {
158 std::fs::File::create(&path)
159 })
160 .await?;
161 Ok(File::new(file, true))
162 }
163
164 /// Synchronizes OS-internal buffered contents and metadata to disk.
165 ///
166 /// This function will ensure that all in-memory data reaches the filesystem.
167 ///
168 /// This can be used to handle errors that would otherwise only be caught when the file is
169 /// closed. When a file is dropped, errors in synchronizing this in-memory data are ignored.
170 ///
171 /// # Examples
172 ///
173 /// ```no_run
174 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
175 /// #
176 /// use async_std::fs::File;
177 /// use async_std::prelude::*;
178 ///
179 /// let mut file = File::create("a.txt").await?;
180 /// file.write_all(b"Hello, world!").await?;
181 /// file.sync_all().await?;
182 /// #
183 /// # Ok(()) }) }
184 /// ```
185 pub async fn sync_all(&self) -> io::Result<()> {
186 // Flush the write cache before calling `sync_all()`.
187 let state = future::poll_fn(|cx| {
188 let state = futures_core::ready!(self.lock.poll_lock(cx));
189 state.poll_flush(cx)
190 })
191 .await?;
192
193 spawn_blocking(move || state.file.sync_all()).await
194 }
195
196 /// Synchronizes OS-internal buffered contents to disk.
197 ///
198 /// This is similar to [`sync_all`], except that file metadata may not be synchronized.
199 ///
200 /// This is intended for use cases that must synchronize the contents of the file, but don't
201 /// need the file metadata synchronized to disk.
202 ///
203 /// Note that some platforms may simply implement this in terms of [`sync_all`].
204 ///
205 /// [`sync_all`]: #method.sync_all
206 ///
207 /// # Examples
208 ///
209 /// ```no_run
210 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
211 /// #
212 /// use async_std::fs::File;
213 /// use async_std::prelude::*;
214 ///
215 /// let mut file = File::create("a.txt").await?;
216 /// file.write_all(b"Hello, world!").await?;
217 /// file.sync_data().await?;
218 /// #
219 /// # Ok(()) }) }
220 /// ```
221 pub async fn sync_data(&self) -> io::Result<()> {
222 // Flush the write cache before calling `sync_data()`.
223 let state = future::poll_fn(|cx| {
224 let state = futures_core::ready!(self.lock.poll_lock(cx));
225 state.poll_flush(cx)
226 })
227 .await?;
228
229 spawn_blocking(move || state.file.sync_data()).await
230 }
231
232 /// Truncates or extends the file.
233 ///
234 /// If `size` is less than the current file size, then the file will be truncated. If it is
235 /// greater than the current file size, then the file will be extended to `size` and have all
236 /// intermediate data filled with zeros.
237 ///
238 /// The file's cursor stays at the same position, even if the cursor ends up being past the end
239 /// of the file after this operation.
240 ///
241 /// # Examples
242 ///
243 /// ```no_run
244 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
245 /// #
246 /// use async_std::fs::File;
247 ///
248 /// let file = File::create("a.txt").await?;
249 /// file.set_len(10).await?;
250 /// #
251 /// # Ok(()) }) }
252 /// ```
253 pub async fn set_len(&self, size: u64) -> io::Result<()> {
254 // Invalidate the read cache and flush the write cache before calling `set_len()`.
255 let state = future::poll_fn(|cx| {
256 let state = futures_core::ready!(self.lock.poll_lock(cx));
257 let state = futures_core::ready!(state.poll_unread(cx))?;
258 state.poll_flush(cx)
259 })
260 .await?;
261
262 spawn_blocking(move || state.file.set_len(size)).await
263 }
264
265 /// Reads the file's metadata.
266 ///
267 /// # Examples
268 ///
269 /// ```no_run
270 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
271 /// #
272 /// use async_std::fs::File;
273 ///
274 /// let file = File::open("a.txt").await?;
275 /// let metadata = file.metadata().await?;
276 /// #
277 /// # Ok(()) }) }
278 /// ```
279 pub async fn metadata(&self) -> io::Result<Metadata> {
280 let file = self.file.clone();
281 spawn_blocking(move || file.metadata()).await
282 }
283
284 /// Changes the permissions on the file.
285 ///
286 /// # Errors
287 ///
288 /// An error will be returned in the following situations:
289 ///
290 /// * The current process lacks permissions to change attributes on the file.
291 /// * Some other I/O error occurred.
292 ///
293 /// # Examples
294 ///
295 /// ```no_run
296 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
297 /// #
298 /// use async_std::fs::File;
299 ///
300 /// let file = File::create("a.txt").await?;
301 ///
302 /// let mut perms = file.metadata().await?.permissions();
303 /// perms.set_readonly(true);
304 /// file.set_permissions(perms).await?;
305 /// #
306 /// # Ok(()) }) }
307 /// ```
308 pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> {
309 let file = self.file.clone();
310 spawn_blocking(move || file.set_permissions(perm)).await
311 }
312}
313
314impl Drop for File {
315 fn drop(&mut self) {
316 // We need to flush the file on drop. Unfortunately, that is not possible to do in a
317 // non-blocking fashion, but our only other option here is losing data remaining in the
318 // write cache. Good task schedulers should be resilient to occasional blocking hiccups in
319 // file destructors so we don't expect this to be a common problem in practice.
320 let _ = futures_lite::future::block_on(self.flush());
321 }
322}
323
324impl fmt::Debug for File {
325 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
326 self.file.fmt(f)
327 }
328}
329
330impl Read for File {
331 fn poll_read(
332 self: Pin<&mut Self>,
333 cx: &mut Context<'_>,
334 buf: &mut [u8],
335 ) -> Poll<io::Result<usize>> {
336 Pin::new(&mut &*self).poll_read(cx, buf)
337 }
338}
339
340impl Read for &File {
341 fn poll_read(
342 self: Pin<&mut Self>,
343 cx: &mut Context<'_>,
344 buf: &mut [u8],
345 ) -> Poll<io::Result<usize>> {
346 let state = futures_core::ready!(self.lock.poll_lock(cx));
347 state.poll_read(cx, buf)
348 }
349}
350
351impl Write for File {
352 fn poll_write(
353 self: Pin<&mut Self>,
354 cx: &mut Context<'_>,
355 buf: &[u8],
356 ) -> Poll<io::Result<usize>> {
357 Pin::new(&mut &*self).poll_write(cx, buf)
358 }
359
360 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
361 Pin::new(&mut &*self).poll_flush(cx)
362 }
363
364 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
365 Pin::new(&mut &*self).poll_close(cx)
366 }
367}
368
369impl Write for &File {
370 fn poll_write(
371 self: Pin<&mut Self>,
372 cx: &mut Context<'_>,
373 buf: &[u8],
374 ) -> Poll<io::Result<usize>> {
375 let state = futures_core::ready!(self.lock.poll_lock(cx));
376 state.poll_write(cx, buf)
377 }
378
379 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
380 let state = futures_core::ready!(self.lock.poll_lock(cx));
381 state.poll_flush(cx).map(|res| res.map(drop))
382 }
383
384 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
385 let state = futures_core::ready!(self.lock.poll_lock(cx));
386 state.poll_close(cx)
387 }
388}
389
390impl Seek for File {
391 fn poll_seek(
392 self: Pin<&mut Self>,
393 cx: &mut Context<'_>,
394 pos: SeekFrom,
395 ) -> Poll<io::Result<u64>> {
396 Pin::new(&mut &*self).poll_seek(cx, pos)
397 }
398}
399
400impl Seek for &File {
401 fn poll_seek(
402 self: Pin<&mut Self>,
403 cx: &mut Context<'_>,
404 pos: SeekFrom,
405 ) -> Poll<io::Result<u64>> {
406 let state = futures_core::ready!(self.lock.poll_lock(cx));
407 state.poll_seek(cx, pos)
408 }
409}
410
411impl From<std::fs::File> for File {
412 fn from(file: std::fs::File) -> File {
413 File::new(file, false)
414 }
415}
416
417cfg_unix! {
418 use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
419
420 impl File {
421 fn into_std_file(self) -> std::fs::File {
422 let file = self.file.clone();
423 drop(self);
424 Arc::try_unwrap(file)
425 .expect(ARC_TRY_UNWRAP_EXPECT)
426 }
427 }
428
429 impl AsRawFd for File {
430 fn as_raw_fd(&self) -> RawFd {
431 self.file.as_raw_fd()
432 }
433 }
434
435 impl FromRawFd for File {
436 unsafe fn from_raw_fd(fd: RawFd) -> File {
437 std::fs::File::from_raw_fd(fd).into()
438 }
439 }
440
441 impl IntoRawFd for File {
442 fn into_raw_fd(self) -> RawFd {
443 self.into_std_file().into_raw_fd()
444 }
445 }
446
447 cfg_io_safety! {
448 use crate::os::unix::io::{AsFd, BorrowedFd, OwnedFd};
449
450 impl AsFd for File {
451 fn as_fd(&self) -> BorrowedFd<'_> {
452 self.file.as_fd()
453 }
454 }
455
456 impl From<OwnedFd> for File {
457 fn from(fd: OwnedFd) -> Self {
458 std::fs::File::from(fd).into()
459 }
460 }
461
462 impl From<File> for OwnedFd {
463 fn from(val: File) -> OwnedFd {
464 val.into_std_file().into()
465 }
466 }
467 }
468}
469
470cfg_windows! {
471 use crate::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
472
473 impl AsRawHandle for File {
474 fn as_raw_handle(&self) -> RawHandle {
475 self.file.as_raw_handle()
476 }
477 }
478
479 impl FromRawHandle for File {
480 unsafe fn from_raw_handle(handle: RawHandle) -> File {
481 std::fs::File::from_raw_handle(handle).into()
482 }
483 }
484
485 impl IntoRawHandle for File {
486 fn into_raw_handle(self) -> RawHandle {
487 let file = self.file.clone();
488 drop(self);
489 Arc::try_unwrap(file)
490 .expect(ARC_TRY_UNWRAP_EXPECT)
491 .into_raw_handle()
492 }
493 }
494
495 cfg_io_safety! {
496 use crate::os::windows::io::{AsHandle, BorrowedHandle, OwnedHandle};
497
498 impl AsHandle for File {
499 fn as_handle(&self) -> BorrowedHandle<'_> {
500 self.file.as_handle()
501 }
502 }
503
504 impl From<OwnedHandle> for File {
505 fn from(handle: OwnedHandle) -> Self {
506 std::fs::File::from(handle).into()
507 }
508 }
509
510 impl From<File> for OwnedHandle {
511 fn from(val: File) -> OwnedHandle {
512 let file = val.file.clone();
513 drop(val);
514 Arc::try_unwrap(file)
515 .expect(ARC_TRY_UNWRAP_EXPECT)
516 .into()
517 }
518 }
519 }
520}
521
522/// An async mutex with non-borrowing lock guards.
523struct Lock<T>(Arc<LockState<T>>);
524
525unsafe impl<T: Send> Send for Lock<T> {}
526unsafe impl<T: Send> Sync for Lock<T> {}
527
528impl<T> Clone for Lock<T> {
529 #[inline]
530 fn clone(&self) -> Self {
531 Self(Arc::clone(&self.0))
532 }
533}
534
535/// The state of a lock.
536struct LockState<T> {
537 /// Set to `true` when locked.
538 locked: AtomicBool,
539
540 /// The inner value.
541 value: UnsafeCell<T>,
542
543 /// A list of tasks interested in acquiring the lock.
544 wakers: Mutex<Vec<Waker>>,
545}
546
547impl<T> Lock<T> {
548 /// Creates a new lock initialized with `value`.
549 fn new(value: T) -> Lock<T> {
550 Lock(Arc::new(LockState {
551 locked: AtomicBool::new(false),
552 value: UnsafeCell::new(value),
553 wakers: Mutex::new(Vec::new()),
554 }))
555 }
556
557 /// Attempts to acquire the lock.
558 fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<LockGuard<T>> {
559 // Try acquiring the lock.
560 if self.0.locked.swap(true, Ordering::Acquire) {
561 // Lock the list of wakers.
562 let mut list = self.0.wakers.lock().unwrap();
563
564 // Try acquiring the lock again.
565 if self.0.locked.swap(true, Ordering::Acquire) {
566 // If failed again, add the current task to the list and return.
567 if list.iter().all(|w| !w.will_wake(cx.waker())) {
568 list.push(cx.waker().clone());
569 }
570 return Poll::Pending;
571 }
572 }
573
574 // The lock was successfully acquired.
575 Poll::Ready(LockGuard(Some(self.0.clone())))
576 }
577}
578
579/// A lock guard.
580///
581/// When dropped, ownership of the inner value is returned back to the lock.
582/// The inner value is always Some, except when the lock is dropped, where we
583/// set it to None. See comment in drop().
584struct LockGuard<T>(Option<Arc<LockState<T>>>);
585
586unsafe impl<T: Send> Send for LockGuard<T> {}
587unsafe impl<T: Sync> Sync for LockGuard<T> {}
588
589impl<T> LockGuard<T> {
590 /// Registers a task interested in acquiring the lock.
591 ///
592 /// When this lock guard gets dropped, all registered tasks will be woken up.
593 fn register(&self, cx: &Context<'_>) {
594 let mut list = self.0.as_ref().unwrap().wakers.lock().unwrap();
595
596 if list.iter().all(|w| !w.will_wake(cx.waker())) {
597 list.push(cx.waker().clone());
598 }
599 }
600}
601
602impl<T> Drop for LockGuard<T> {
603 fn drop(&mut self) {
604 // Set the Option to None and take its value so we can drop the Arc
605 // before we wake up the tasks.
606 let lock = self.0.take().unwrap();
607
608 // Prepare to wake up all registered tasks interested in acquiring the lock.
609 let wakers: Vec<_> = lock.wakers.lock().unwrap().drain(..).collect();
610
611 // Release the lock.
612 lock.locked.store(false, Ordering::Release);
613
614 // Drop the Arc _before_ waking up the tasks, to avoid races. See
615 // reproducer and discussion in https://github.com/async-rs/async-std/issues/1001.
616 drop(lock);
617
618 // Wake up all registered tasks interested in acquiring the lock.
619 for w in wakers {
620 w.wake();
621 }
622 }
623}
624
625impl<T> Deref for LockGuard<T> {
626 type Target = T;
627
628 fn deref(&self) -> &T {
629 // SAFETY: Safe because the lock is held when this method is called. And
630 // the inner value is always Some since it is only set to None in
631 // drop().
632 unsafe { &*self.0.as_ref().unwrap().value.get() }
633 }
634}
635
636impl<T> DerefMut for LockGuard<T> {
637 fn deref_mut(&mut self) -> &mut T {
638 // SAFETY: Safe because the lock is held when this method is called. And
639 // the inner value is always Some since it is only set to None in
640 // drop().
641 unsafe { &mut *self.0.as_ref().unwrap().value.get() }
642 }
643}
644
645/// Modes a file can be in.
646///
647/// The file can either be in idle mode, reading mode, or writing mode.
648enum Mode {
649 /// The cache is empty.
650 Idle,
651
652 /// The cache contains data read from the inner file.
653 ///
654 /// The `usize` represents how many bytes from the beginning of cache have been consumed.
655 Reading(usize),
656
657 /// The cache contains data that needs to be written to the inner file.
658 Writing,
659}
660
661/// The current state of a file.
662///
663/// The `File` struct protects this state behind a lock.
664///
665/// Filesystem operations that get spawned as blocking tasks will acquire the lock, take ownership
666/// of the state and return it back once the operation completes.
667struct State {
668 /// The inner file.
669 file: Arc<std::fs::File>,
670
671 /// The current mode (idle, reading, or writing).
672 mode: Mode,
673
674 /// The read/write cache.
675 ///
676 /// If in reading mode, the cache contains a chunk of data that has been read from the file.
677 /// If in writing mode, the cache contains data that will eventually be written to the file.
678 cache: Vec<u8>,
679
680 /// Set to `true` if the file is flushed.
681 ///
682 /// When a file is flushed, the write cache and the inner file's buffer are empty.
683 is_flushed: bool,
684
685 /// The last read error that came from an async operation.
686 last_read_err: Option<io::Error>,
687
688 /// The last write error that came from an async operation.
689 last_write_err: Option<io::Error>,
690}
691
692impl LockGuard<State> {
693 /// Seeks to a new position in the file.
694 fn poll_seek(mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll<io::Result<u64>> {
695 // If this operation doesn't move the cursor, then poll the current position inside the
696 // file. This call should not block because it doesn't touch the actual file on disk.
697 if pos == SeekFrom::Current(0) {
698 // Poll the internal file cursor.
699 let internal = (&*self.file).seek(SeekFrom::Current(0))?;
700
701 // Factor in the difference caused by caching.
702 let actual = match self.mode {
703 Mode::Idle => internal,
704 Mode::Reading(start) => internal - self.cache.len() as u64 + start as u64,
705 Mode::Writing => internal + self.cache.len() as u64,
706 };
707 return Poll::Ready(Ok(actual));
708 }
709
710 // If the file is in reading mode and the cache will stay valid after seeking, then adjust
711 // the current position in the read cache without invaliding it.
712 if let Mode::Reading(start) = self.mode {
713 if let SeekFrom::Current(diff) = pos {
714 if let Some(new) = (start as i64).checked_add(diff) {
715 if 0 <= new && new <= self.cache.len() as i64 {
716 // Poll the internal file cursor.
717 let internal = (&*self.file).seek(SeekFrom::Current(0))?;
718
719 // Adjust the current position in the read cache.
720 self.mode = Mode::Reading(new as usize);
721
722 // Factor in the difference caused by caching.
723 return Poll::Ready(Ok(internal - self.cache.len() as u64 + new as u64));
724 }
725 }
726 }
727 }
728
729 // Invalidate the read cache and flush the write cache before calling `seek()`.
730 self = futures_core::ready!(self.poll_unread(cx))?;
731 self = futures_core::ready!(self.poll_flush(cx))?;
732
733 // Seek to the new position. This call should not block because it only changes the
734 // internal offset into the file and doesn't touch the actual file on disk.
735 Poll::Ready((&*self.file).seek(pos))
736 }
737
738 /// Reads some bytes from the file into a buffer.
739 fn poll_read(mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
740 // If an async operation has left a read error, return it now.
741 if let Some(err) = self.last_read_err.take() {
742 return Poll::Ready(Err(err));
743 }
744
745 match self.mode {
746 Mode::Idle => {}
747 Mode::Reading(0) if self.cache.is_empty() => {
748 // If the cache is empty in reading mode, the last operation didn't read any bytes,
749 // which indicates that it reached the end of the file. In this case we need to
750 // reset the mode to idle so that next time we try to read again, since the file
751 // may grow after the first EOF.
752 self.mode = Mode::Idle;
753 return Poll::Ready(Ok(0));
754 }
755 Mode::Reading(start) => {
756 // How many bytes in the cache are available for reading.
757 let available = self.cache.len() - start;
758
759 if available > 0 {
760 // Copy data from the cache into the buffer.
761 let n = cmp::min(available, buf.len());
762 buf[..n].copy_from_slice(&self.cache[start..(start + n)]);
763
764 // Move the read cursor forward.
765 self.mode = Mode::Reading(start + n);
766
767 return Poll::Ready(Ok(n));
768 }
769 }
770 Mode::Writing => {
771 // If we're in writing mode, flush the write cache.
772 self = futures_core::ready!(self.poll_flush(cx))?;
773 }
774 }
775
776 // Make the cache as long as `buf`.
777 if self.cache.len() < buf.len() {
778 let diff = buf.len() - self.cache.len();
779 self.cache.reserve(diff);
780 }
781 unsafe {
782 self.cache.set_len(buf.len());
783 }
784
785 // Register current task's interest in the file lock.
786 self.register(cx);
787
788 // Start a read operation asynchronously.
789 spawn_blocking(move || {
790 // Read some data from the file into the cache.
791 let res = {
792 let State { file, cache, .. } = &mut *self;
793 (&**file).read(cache)
794 };
795
796 match res {
797 Ok(n) => {
798 // Update cache length and switch to reading mode, starting from index 0.
799 unsafe {
800 self.cache.set_len(n);
801 }
802 self.mode = Mode::Reading(0);
803 }
804 Err(err) => {
805 // Save the error and switch to idle mode.
806 self.cache.clear();
807 self.mode = Mode::Idle;
808 self.last_read_err = Some(err);
809 }
810 }
811 });
812
813 Poll::Pending
814 }
815
816 /// Invalidates the read cache.
817 ///
818 /// This method will also move the internal file's cursor backwards by the number of unconsumed
819 /// bytes in the read cache.
820 fn poll_unread(mut self, _: &mut Context<'_>) -> Poll<io::Result<Self>> {
821 match self.mode {
822 Mode::Idle | Mode::Writing => Poll::Ready(Ok(self)),
823 Mode::Reading(start) => {
824 // The number of unconsumed bytes in the read cache.
825 let n = self.cache.len() - start;
826
827 if n > 0 {
828 // Seek `n` bytes backwards. This call should not block because it only changes
829 // the internal offset into the file and doesn't touch the actual file on disk.
830 //
831 // We ignore errors here because special files like `/dev/random` are not
832 // seekable.
833 let _ = (&*self.file).seek(SeekFrom::Current(-(n as i64)));
834 }
835
836 // Switch to idle mode.
837 self.cache.clear();
838 self.mode = Mode::Idle;
839
840 Poll::Ready(Ok(self))
841 }
842 }
843 }
844
845 /// Writes some data from a buffer into the file.
846 fn poll_write(mut self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
847 // If an async operation has left a write error, return it now.
848 if let Some(err) = self.last_write_err.take() {
849 return Poll::Ready(Err(err));
850 }
851
852 // If we're in reading mode, invalidate the read buffer.
853 self = futures_core::ready!(self.poll_unread(cx))?;
854
855 // If necessary, grow the cache to have as much capacity as `buf`.
856 if self.cache.capacity() < buf.len() {
857 let diff = buf.len() - self.cache.capacity();
858 self.cache.reserve(diff);
859 }
860
861 // How many bytes can be written into the cache before filling up.
862 let available = self.cache.capacity() - self.cache.len();
863
864 // If there is space available in the cache or if the buffer is empty, we can write data
865 // into the cache.
866 if available > 0 || buf.is_empty() {
867 let n = cmp::min(available, buf.len());
868 let start = self.cache.len();
869
870 // Copy data from the buffer into the cache.
871 unsafe {
872 self.cache.set_len(start + n);
873 }
874 self.cache[start..start + n].copy_from_slice(&buf[..n]);
875
876 // Mark the file as not flushed and switch to writing mode.
877 self.is_flushed = false;
878 self.mode = Mode::Writing;
879 Poll::Ready(Ok(n))
880 } else {
881 // Drain the write cache because it's full.
882 futures_core::ready!(self.poll_drain(cx))?;
883 Poll::Pending
884 }
885 }
886
887 /// Drains the write cache.
888 fn poll_drain(mut self, cx: &mut Context<'_>) -> Poll<io::Result<Self>> {
889 // If an async operation has left a write error, return it now.
890 if let Some(err) = self.last_write_err.take() {
891 return Poll::Ready(Err(err));
892 }
893
894 match self.mode {
895 Mode::Idle | Mode::Reading(..) => Poll::Ready(Ok(self)),
896 Mode::Writing => {
897 // Register current task's interest in the file lock.
898 self.register(cx);
899
900 // Start a write operation asynchronously.
901 spawn_blocking(move || {
902 match (&*self.file).write_all(&self.cache) {
903 Ok(_) => {
904 // Switch to idle mode.
905 self.cache.clear();
906 self.mode = Mode::Idle;
907 }
908 Err(err) => {
909 // Save the error.
910 self.last_write_err = Some(err);
911 }
912 };
913 });
914
915 Poll::Pending
916 }
917 }
918 }
919
920 /// Flushes the write cache into the file.
921 fn poll_flush(mut self, cx: &mut Context<'_>) -> Poll<io::Result<Self>> {
922 // If the file is already in flushed state, return.
923 if self.is_flushed {
924 return Poll::Ready(Ok(self));
925 }
926
927 // If there is data in the write cache, drain it.
928 self = futures_core::ready!(self.poll_drain(cx))?;
929
930 // Register current task's interest in the file lock.
931 self.register(cx);
932
933 // Start a flush operation asynchronously.
934 spawn_blocking(move || {
935 match (&*self.file).flush() {
936 Ok(()) => {
937 // Mark the file as flushed.
938 self.is_flushed = true;
939 }
940 Err(err) => {
941 // Save the error.
942 self.last_write_err = Some(err);
943 }
944 }
945 });
946
947 Poll::Pending
948 }
949
950 // This function does nothing because we're not sure about `AsyncWrite::poll_close()`'s exact
951 // semantics nor whether it will stay in the `AsyncWrite` trait.
952 fn poll_close(self, _: &mut Context<'_>) -> Poll<io::Result<()>> {
953 Poll::Ready(Ok(()))
954 }
955}
956
957#[cfg(test)]
958mod tests {
959 use super::*;
960
961 #[test]
962 fn async_file_drop() {
963 crate::task::block_on(async move {
964 File::open(file!()).await.unwrap();
965 });
966 }
967
968 #[test]
969 fn async_file_clone() {
970 crate::task::block_on(async move {
971 let file = File::open(file!()).await.unwrap();
972 let mut clone = file.clone();
973 let len = crate::task::spawn_blocking(move || {
974 let mut buf = Vec::new();
975 crate::task::block_on(async move {
976 clone.read_to_end(&mut buf).await.unwrap();
977 drop(clone);
978 buf.len()
979 })
980 }).await;
981 assert_eq!(len as u64, file.metadata().await.unwrap().len());
982 });
983 }
984
985 #[test]
986 fn async_file_create_error() {
987 let file_name = Path::new("/tmp/does_not_exist/test");
988 let expect = std::fs::File::create(file_name).unwrap_err();
989
990 crate::task::block_on(async move {
991 let actual = File::create(file_name).await.unwrap_err();
992 assert_eq!(format!("{}", expect), format!("{}", actual));
993 })
994 }
995
996 #[test]
997 fn file_eof_is_not_permanent() -> crate::io::Result<()> {
998 let tempdir = tempfile::Builder::new()
999 .prefix("async-std-file-eof-test")
1000 .tempdir()?;
1001 let path = tempdir.path().join("testfile");
1002
1003 crate::task::block_on(async {
1004 let mut file_w = File::create(&path).await?;
1005 let mut file_r = File::open(&path).await?;
1006
1007 file_w.write_all(b"data").await?;
1008 file_w.flush().await?;
1009
1010 let mut buf = [0u8; 4];
1011 let mut len = file_r.read(&mut buf).await?;
1012 assert_eq!(len, 4);
1013 assert_eq!(&buf, b"data");
1014
1015 len = file_r.read(&mut buf).await?;
1016 assert_eq!(len, 0);
1017
1018 file_w.write_all(b"more").await?;
1019 file_w.flush().await?;
1020
1021 len = file_r.read(&mut buf).await?;
1022 assert_eq!(len, 4);
1023 assert_eq!(&buf, b"more");
1024
1025 len = file_r.read(&mut buf).await?;
1026 assert_eq!(len, 0);
1027
1028 Ok(())
1029 })
1030 }
1031}