broker_tokio/fs/file.rs
1//! Types for working with [`File`].
2//!
3//! [`File`]: File
4
5use self::State::*;
6use crate::fs::{asyncify, sys};
7use crate::io::blocking::Buf;
8use crate::io::{AsyncRead, AsyncSeek, AsyncWrite};
9
10use std::fmt;
11use std::fs::{Metadata, Permissions};
12use std::future::Future;
13use std::io::{self, Seek, SeekFrom};
14use std::path::Path;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::Context;
18use std::task::Poll;
19use std::task::Poll::*;
20
21/// A reference to an open file on the filesystem.
22///
23/// This is a specialized version of [`std::fs::File`][std] for usage from the
24/// Tokio runtime.
25///
26/// An instance of a `File` can be read and/or written depending on what options
27/// it was opened with. Files also implement Seek to alter the logical cursor
28/// that the file contains internally.
29///
30/// Files are automatically closed when they go out of scope.
31///
32/// [std]: std::fs::File
33///
34/// # Examples
35///
36/// Create a new file and asynchronously write bytes to it:
37///
38/// ```no_run
39/// use tokio::fs::File;
40/// use tokio::prelude::*;
41///
42/// # async fn dox() -> std::io::Result<()> {
43/// let mut file = File::create("foo.txt").await?;
44/// file.write_all(b"hello, world!").await?;
45/// # Ok(())
46/// # }
47/// ```
48///
49/// Read the contents of a file into a buffer
50///
51/// ```no_run
52/// use tokio::fs::File;
53/// use tokio::prelude::*;
54///
55/// # async fn dox() -> std::io::Result<()> {
56/// let mut file = File::open("foo.txt").await?;
57///
58/// let mut contents = vec![];
59/// file.read_to_end(&mut contents).await?;
60///
61/// println!("len = {}", contents.len());
62/// # Ok(())
63/// # }
64/// ```
65pub struct File {
66 std: Arc<sys::File>,
67 state: State,
68
69 /// Errors from writes/flushes are returned in write/flush calls. If a write
70 /// error is observed while performing a read, it is saved until the next
71 /// write / flush call.
72 last_write_err: Option<io::ErrorKind>,
73}
74
75#[derive(Debug)]
76enum State {
77 Idle(Option<Buf>),
78 Busy(sys::Blocking<(Operation, Buf)>),
79}
80
81#[derive(Debug)]
82enum Operation {
83 Read(io::Result<usize>),
84 Write(io::Result<()>),
85 Seek(io::Result<u64>),
86}
87
88impl File {
89 /// Attempts to open a file in read-only mode.
90 ///
91 /// See [`OpenOptions`] for more details.
92 ///
93 /// [`OpenOptions`]: super::OpenOptions
94 ///
95 /// # Errors
96 ///
97 /// This function will return an error if called from outside of the Tokio
98 /// runtime or if path does not already exist. Other errors may also be
99 /// returned according to OpenOptions::open.
100 ///
101 /// # Examples
102 ///
103 /// ```no_run
104 /// use tokio::fs::File;
105 /// use tokio::prelude::*;
106 ///
107 /// # async fn dox() -> std::io::Result<()> {
108 /// let mut file = File::open("foo.txt").await?;
109 ///
110 /// let mut contents = vec![];
111 /// file.read_to_end(&mut contents).await?;
112 ///
113 /// println!("len = {}", contents.len());
114 /// # Ok(())
115 /// # }
116 /// ```
117 pub async fn open(path: impl AsRef<Path>) -> io::Result<File> {
118 let path = path.as_ref().to_owned();
119 let std = asyncify(|| sys::File::open(path)).await?;
120
121 Ok(File::from_std(std))
122 }
123
124 /// Opens a file in write-only mode.
125 ///
126 /// This function will create a file if it does not exist, and will truncate
127 /// it if it does.
128 ///
129 /// See [`OpenOptions`] for more details.
130 ///
131 /// [`OpenOptions`]: super::OpenOptions
132 ///
133 /// # Errors
134 ///
135 /// Results in an error if called from outside of the Tokio runtime or if
136 /// the underlying [`create`] call results in an error.
137 ///
138 /// [`create`]: std::fs::File::create
139 ///
140 /// # Examples
141 ///
142 /// ```no_run
143 /// use tokio::fs::File;
144 /// use tokio::prelude::*;
145 ///
146 /// # async fn dox() -> std::io::Result<()> {
147 /// let mut file = File::create("foo.txt").await?;
148 /// file.write_all(b"hello, world!").await?;
149 /// # Ok(())
150 /// # }
151 /// ```
152 pub async fn create(path: impl AsRef<Path>) -> io::Result<File> {
153 let path = path.as_ref().to_owned();
154 let std_file = asyncify(move || sys::File::create(path)).await?;
155 Ok(File::from_std(std_file))
156 }
157
158 /// Convert a [`std::fs::File`][std] to a [`tokio::fs::File`][file].
159 ///
160 /// [std]: std::fs::File
161 /// [file]: File
162 ///
163 /// # Examples
164 ///
165 /// ```no_run
166 /// // This line could block. It is not recommended to do this on the Tokio
167 /// // runtime.
168 /// let std_file = std::fs::File::open("foo.txt").unwrap();
169 /// let file = tokio::fs::File::from_std(std_file);
170 /// ```
171 pub fn from_std(std: sys::File) -> File {
172 File {
173 std: Arc::new(std),
174 state: State::Idle(Some(Buf::with_capacity(0))),
175 last_write_err: None,
176 }
177 }
178
179 /// Seek to an offset, in bytes, in a stream.
180 ///
181 /// # Examples
182 ///
183 /// ```no_run
184 /// use tokio::fs::File;
185 /// use tokio::prelude::*;
186 ///
187 /// use std::io::SeekFrom;
188 ///
189 /// # async fn dox() -> std::io::Result<()> {
190 /// let mut file = File::open("foo.txt").await?;
191 /// file.seek(SeekFrom::Start(6)).await?;
192 ///
193 /// let mut contents = vec![0u8; 10];
194 /// file.read_exact(&mut contents).await?;
195 /// # Ok(())
196 /// # }
197 /// ```
198 pub async fn seek(&mut self, mut pos: SeekFrom) -> io::Result<u64> {
199 self.complete_inflight().await;
200
201 let mut buf = match self.state {
202 Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
203 _ => unreachable!(),
204 };
205
206 // Factor in any unread data from the buf
207 if !buf.is_empty() {
208 let n = buf.discard_read();
209
210 if let SeekFrom::Current(ref mut offset) = pos {
211 *offset += n;
212 }
213 }
214
215 let std = self.std.clone();
216
217 // Start the operation
218 self.state = Busy(sys::run(move || {
219 let res = (&*std).seek(pos);
220 (Operation::Seek(res), buf)
221 }));
222
223 let (op, buf) = match self.state {
224 Idle(_) => unreachable!(),
225 Busy(ref mut rx) => rx.await.unwrap(),
226 };
227
228 self.state = Idle(Some(buf));
229
230 match op {
231 Operation::Seek(res) => res,
232 _ => unreachable!(),
233 }
234 }
235
236 /// Attempts to sync all OS-internal metadata to disk.
237 ///
238 /// This function will attempt to ensure that all in-core data reaches the
239 /// filesystem before returning.
240 ///
241 /// # Examples
242 ///
243 /// ```no_run
244 /// use tokio::fs::File;
245 /// use tokio::prelude::*;
246 ///
247 /// # async fn dox() -> std::io::Result<()> {
248 /// let mut file = File::create("foo.txt").await?;
249 /// file.write_all(b"hello, world!").await?;
250 /// file.sync_all().await?;
251 /// # Ok(())
252 /// # }
253 /// ```
254 pub async fn sync_all(&mut self) -> io::Result<()> {
255 self.complete_inflight().await;
256
257 let std = self.std.clone();
258 asyncify(move || std.sync_all()).await
259 }
260
261 /// This function is similar to `sync_all`, except that it may not
262 /// synchronize file metadata to the filesystem.
263 ///
264 /// This is intended for use cases that must synchronize content, but don't
265 /// need the metadata on disk. The goal of this method is to reduce disk
266 /// operations.
267 ///
268 /// Note that some platforms may simply implement this in terms of `sync_all`.
269 ///
270 /// # Examples
271 ///
272 /// ```no_run
273 /// use tokio::fs::File;
274 /// use tokio::prelude::*;
275 ///
276 /// # async fn dox() -> std::io::Result<()> {
277 /// let mut file = File::create("foo.txt").await?;
278 /// file.write_all(b"hello, world!").await?;
279 /// file.sync_data().await?;
280 /// # Ok(())
281 /// # }
282 /// ```
283 pub async fn sync_data(&mut self) -> io::Result<()> {
284 self.complete_inflight().await;
285
286 let std = self.std.clone();
287 asyncify(move || std.sync_data()).await
288 }
289
290 /// Truncates or extends the underlying file, updating the size of this file to become size.
291 ///
292 /// If the size is less than the current file's size, then the file will be
293 /// shrunk. If it is greater than the current file's size, then the file
294 /// will be extended to size and have all of the intermediate data filled in
295 /// with 0s.
296 ///
297 /// # Errors
298 ///
299 /// This function will return an error if the file is not opened for
300 /// writing.
301 ///
302 /// # Examples
303 ///
304 /// ```no_run
305 /// use tokio::fs::File;
306 /// use tokio::prelude::*;
307 ///
308 /// # async fn dox() -> std::io::Result<()> {
309 /// let mut file = File::create("foo.txt").await?;
310 /// file.write_all(b"hello, world!").await?;
311 /// file.set_len(10).await?;
312 /// # Ok(())
313 /// # }
314 /// ```
315 pub async fn set_len(&mut self, size: u64) -> io::Result<()> {
316 self.complete_inflight().await;
317
318 let mut buf = match self.state {
319 Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
320 _ => unreachable!(),
321 };
322
323 let seek = if !buf.is_empty() {
324 Some(SeekFrom::Current(buf.discard_read()))
325 } else {
326 None
327 };
328
329 let std = self.std.clone();
330
331 self.state = Busy(sys::run(move || {
332 let res = if let Some(seek) = seek {
333 (&*std).seek(seek).and_then(|_| std.set_len(size))
334 } else {
335 std.set_len(size)
336 }
337 .map(|_| 0); // the value is discarded later
338
339 // Return the result as a seek
340 (Operation::Seek(res), buf)
341 }));
342
343 let (op, buf) = match self.state {
344 Idle(_) => unreachable!(),
345 Busy(ref mut rx) => rx.await?,
346 };
347
348 self.state = Idle(Some(buf));
349
350 match op {
351 Operation::Seek(res) => res.map(|_| ()),
352 _ => unreachable!(),
353 }
354 }
355
356 /// Queries metadata about the underlying file.
357 ///
358 /// # Examples
359 ///
360 /// ```no_run
361 /// use tokio::fs::File;
362 ///
363 /// # async fn dox() -> std::io::Result<()> {
364 /// let file = File::open("foo.txt").await?;
365 /// let metadata = file.metadata().await?;
366 ///
367 /// println!("{:?}", metadata);
368 /// # Ok(())
369 /// # }
370 /// ```
371 pub async fn metadata(&self) -> io::Result<Metadata> {
372 let std = self.std.clone();
373 asyncify(move || std.metadata()).await
374 }
375
376 /// Create a new `File` instance that shares the same underlying file handle
377 /// as the existing `File` instance. Reads, writes, and seeks will affect both
378 /// File instances simultaneously.
379 ///
380 /// # Examples
381 ///
382 /// ```no_run
383 /// use tokio::fs::File;
384 ///
385 /// # async fn dox() -> std::io::Result<()> {
386 /// let file = File::open("foo.txt").await?;
387 /// let file_clone = file.try_clone().await?;
388 /// # Ok(())
389 /// # }
390 /// ```
391 pub async fn try_clone(&self) -> io::Result<File> {
392 let std = self.std.clone();
393 let std_file = asyncify(move || std.try_clone()).await?;
394 Ok(File::from_std(std_file))
395 }
396
397 /// Destructures `File` into a [`std::fs::File`][std]. This function is
398 /// async to allow any in-flight operations to complete.
399 ///
400 /// Use `File::try_into_std` to attempt conversion immediately.
401 ///
402 /// [std]: std::fs::File
403 ///
404 /// # Examples
405 ///
406 /// ```no_run
407 /// use tokio::fs::File;
408 ///
409 /// # async fn dox() -> std::io::Result<()> {
410 /// let tokio_file = File::open("foo.txt").await?;
411 /// let std_file = tokio_file.into_std().await;
412 /// # Ok(())
413 /// # }
414 /// ```
415 pub async fn into_std(mut self) -> sys::File {
416 self.complete_inflight().await;
417 Arc::try_unwrap(self.std).expect("Arc::try_unwrap failed")
418 }
419
420 /// Tries to immediately destructure `File` into a [`std::fs::File`][std].
421 ///
422 /// [std]: std::fs::File
423 ///
424 /// # Errors
425 ///
426 /// This function will return an error containing the file if some
427 /// operation is in-flight.
428 ///
429 /// # Examples
430 ///
431 /// ```no_run
432 /// use tokio::fs::File;
433 ///
434 /// # async fn dox() -> std::io::Result<()> {
435 /// let tokio_file = File::open("foo.txt").await?;
436 /// let std_file = tokio_file.try_into_std().unwrap();
437 /// # Ok(())
438 /// # }
439 /// ```
440 pub fn try_into_std(mut self) -> Result<sys::File, Self> {
441 match Arc::try_unwrap(self.std) {
442 Ok(file) => Ok(file),
443 Err(std_file_arc) => {
444 self.std = std_file_arc;
445 Err(self)
446 }
447 }
448 }
449
450 /// Changes the permissions on the underlying file.
451 ///
452 /// # Platform-specific behavior
453 ///
454 /// This function currently corresponds to the `fchmod` function on Unix and
455 /// the `SetFileInformationByHandle` function on Windows. Note that, this
456 /// [may change in the future][changes].
457 ///
458 /// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior
459 ///
460 /// # Errors
461 ///
462 /// This function will return an error if the user lacks permission change
463 /// attributes on the underlying file. It may also return an error in other
464 /// os-specific unspecified cases.
465 ///
466 /// # Examples
467 ///
468 /// ```no_run
469 /// use tokio::fs::File;
470 ///
471 /// # async fn dox() -> std::io::Result<()> {
472 /// let file = File::open("foo.txt").await?;
473 /// let mut perms = file.metadata().await?.permissions();
474 /// perms.set_readonly(true);
475 /// file.set_permissions(perms).await?;
476 /// # Ok(())
477 /// # }
478 /// ```
479 pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> {
480 let std = self.std.clone();
481 asyncify(move || std.set_permissions(perm)).await
482 }
483
484 async fn complete_inflight(&mut self) {
485 use crate::future::poll_fn;
486
487 if let Err(e) = poll_fn(|cx| Pin::new(&mut *self).poll_flush(cx)).await {
488 self.last_write_err = Some(e.kind());
489 }
490 }
491}
492
493impl AsyncRead for File {
494 fn poll_read(
495 mut self: Pin<&mut Self>,
496 cx: &mut Context<'_>,
497 dst: &mut [u8],
498 ) -> Poll<io::Result<usize>> {
499 loop {
500 match self.state {
501 Idle(ref mut buf_cell) => {
502 let mut buf = buf_cell.take().unwrap();
503
504 if !buf.is_empty() {
505 let n = buf.copy_to(dst);
506 *buf_cell = Some(buf);
507 return Ready(Ok(n));
508 }
509
510 buf.ensure_capacity_for(dst);
511 let std = self.std.clone();
512
513 self.state = Busy(sys::run(move || {
514 let res = buf.read_from(&mut &*std);
515 (Operation::Read(res), buf)
516 }));
517 }
518 Busy(ref mut rx) => {
519 let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?;
520
521 match op {
522 Operation::Read(Ok(_)) => {
523 let n = buf.copy_to(dst);
524 self.state = Idle(Some(buf));
525 return Ready(Ok(n));
526 }
527 Operation::Read(Err(e)) => {
528 assert!(buf.is_empty());
529
530 self.state = Idle(Some(buf));
531 return Ready(Err(e));
532 }
533 Operation::Write(Ok(_)) => {
534 assert!(buf.is_empty());
535 self.state = Idle(Some(buf));
536 continue;
537 }
538 Operation::Write(Err(e)) => {
539 assert!(self.last_write_err.is_none());
540 self.last_write_err = Some(e.kind());
541 self.state = Idle(Some(buf));
542 }
543 Operation::Seek(_) => {
544 assert!(buf.is_empty());
545 self.state = Idle(Some(buf));
546 continue;
547 }
548 }
549 }
550 }
551 }
552 }
553}
554
555impl AsyncSeek for File {
556 fn start_seek(
557 mut self: Pin<&mut Self>,
558 cx: &mut Context<'_>,
559 mut pos: SeekFrom,
560 ) -> Poll<io::Result<()>> {
561 loop {
562 match self.state {
563 Idle(ref mut buf_cell) => {
564 let mut buf = buf_cell.take().unwrap();
565
566 // Factor in any unread data from the buf
567 if !buf.is_empty() {
568 let n = buf.discard_read();
569
570 if let SeekFrom::Current(ref mut offset) = pos {
571 *offset += n;
572 }
573 }
574
575 let std = self.std.clone();
576
577 self.state = Busy(sys::run(move || {
578 let res = (&*std).seek(pos);
579 (Operation::Seek(res), buf)
580 }));
581
582 return Ready(Ok(()));
583 }
584 Busy(ref mut rx) => {
585 let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
586 self.state = Idle(Some(buf));
587
588 match op {
589 Operation::Read(_) => {}
590 Operation::Write(Err(e)) => {
591 assert!(self.last_write_err.is_none());
592 self.last_write_err = Some(e.kind());
593 }
594 Operation::Write(_) => {}
595 Operation::Seek(_) => {}
596 }
597 }
598 }
599 }
600 }
601
602 fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
603 loop {
604 match self.state {
605 Idle(_) => panic!("must call start_seek before calling poll_complete"),
606 Busy(ref mut rx) => {
607 let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
608 self.state = Idle(Some(buf));
609
610 match op {
611 Operation::Read(_) => {}
612 Operation::Write(Err(e)) => {
613 assert!(self.last_write_err.is_none());
614 self.last_write_err = Some(e.kind());
615 }
616 Operation::Write(_) => {}
617 Operation::Seek(res) => return Ready(res),
618 }
619 }
620 }
621 }
622 }
623}
624
625impl AsyncWrite for File {
626 fn poll_write(
627 mut self: Pin<&mut Self>,
628 cx: &mut Context<'_>,
629 src: &[u8],
630 ) -> Poll<io::Result<usize>> {
631 if let Some(e) = self.last_write_err.take() {
632 return Ready(Err(e.into()));
633 }
634
635 loop {
636 match self.state {
637 Idle(ref mut buf_cell) => {
638 let mut buf = buf_cell.take().unwrap();
639
640 let seek = if !buf.is_empty() {
641 Some(SeekFrom::Current(buf.discard_read()))
642 } else {
643 None
644 };
645
646 let n = buf.copy_from(src);
647 let std = self.std.clone();
648
649 self.state = Busy(sys::run(move || {
650 let res = if let Some(seek) = seek {
651 (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
652 } else {
653 buf.write_to(&mut &*std)
654 };
655
656 (Operation::Write(res), buf)
657 }));
658
659 return Ready(Ok(n));
660 }
661 Busy(ref mut rx) => {
662 let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
663 self.state = Idle(Some(buf));
664
665 match op {
666 Operation::Read(_) => {
667 // We don't care about the result here. The fact
668 // that the cursor has advanced will be reflected in
669 // the next iteration of the loop
670 continue;
671 }
672 Operation::Write(res) => {
673 // If the previous write was successful, continue.
674 // Otherwise, error.
675 res?;
676 continue;
677 }
678 Operation::Seek(_) => {
679 // Ignore the seek
680 continue;
681 }
682 }
683 }
684 }
685 }
686 }
687
688 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
689 if let Some(e) = self.last_write_err.take() {
690 return Ready(Err(e.into()));
691 }
692
693 let (op, buf) = match self.state {
694 Idle(_) => return Ready(Ok(())),
695 Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?,
696 };
697
698 // The buffer is not used here
699 self.state = Idle(Some(buf));
700
701 match op {
702 Operation::Read(_) => Ready(Ok(())),
703 Operation::Write(res) => Ready(res),
704 Operation::Seek(_) => Ready(Ok(())),
705 }
706 }
707
708 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
709 Poll::Ready(Ok(()))
710 }
711}
712
713impl From<sys::File> for File {
714 fn from(std: sys::File) -> Self {
715 Self::from_std(std)
716 }
717}
718
719impl fmt::Debug for File {
720 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
721 fmt.debug_struct("tokio::fs::File")
722 .field("std", &self.std)
723 .finish()
724 }
725}
726
727#[cfg(unix)]
728impl std::os::unix::io::AsRawFd for File {
729 fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
730 self.std.as_raw_fd()
731 }
732}
733
734#[cfg(windows)]
735impl std::os::windows::io::AsRawHandle for File {
736 fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
737 self.std.as_raw_handle()
738 }
739}