openssh_sftp_client/fs/
dir.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
use crate::{
    cancel_error,
    lowlevel::NameEntry,
    metadata::{FileType, MetaData},
    Error,
};

use super::Dir;

use std::{
    borrow::Cow,
    future::Future,
    path::Path,
    pin::Pin,
    task::{ready, Context, Poll},
    vec::IntoIter,
};

use futures_core::stream::{FusedStream, Stream};
use pin_project::{pin_project, pinned_drop};
use tokio_util::sync::WaitForCancellationFutureOwned;

type ResponseFuture = crate::lowlevel::AwaitableNameEntriesFuture<crate::Buffer>;

/// Entries returned by the [`ReadDir`].
///
/// This is a specialized version of [`std::fs::DirEntry`].
#[repr(transparent)]
#[derive(Debug, Clone)]
pub struct DirEntry(NameEntry);

impl DirEntry {
    /// Return filename of the dir entry.
    pub fn filename(&self) -> &Path {
        &self.0.filename
    }

    /// Return filename of the dir entry as a mutable reference.
    pub fn filename_mut(&mut self) -> &mut Box<Path> {
        &mut self.0.filename
    }

    /// Return metadata for the dir entry.
    pub fn metadata(&self) -> MetaData {
        MetaData::new(self.0.attrs)
    }

    /// Return the file type for the dir entry.
    pub fn file_type(&self) -> Option<FileType> {
        self.metadata().file_type()
    }
}

/// Reads the the entries in a directory.
#[derive(Debug)]
#[pin_project(PinnedDrop)]
pub struct ReadDir {
    dir: Dir,

    // future and entries contain the state
    //
    // Invariant:
    //  - entries.is_none() => future.is_none()
    //  - If entries.is_some(), then future.is_none() ^ entries.unwrap().as_slice().is_empty()
    future: Option<ResponseFuture>,
    entries: Option<IntoIter<NameEntry>>,

    /// cancellation_fut is not only cancel-safe, but also can be polled after
    /// it is ready.
    ///
    /// Once it is ready, all polls after that immediately return Poll::Ready(())
    #[pin]
    cancellation_fut: WaitForCancellationFutureOwned,
}

impl ReadDir {
    pub(super) fn new(dir: Dir) -> Self {
        Self {
            cancellation_fut: dir.0.get_auxiliary().cancel_token.clone().cancelled_owned(),
            dir,
            future: None,
            entries: Some(Vec::new().into_iter()),
        }
    }

    fn new_request(dir: &mut Dir) -> Result<ResponseFuture, Error> {
        let owned_handle = &mut dir.0;

        let id = owned_handle.get_id_mut();
        let handle = &owned_handle.handle;
        let write_end = &mut owned_handle.write_end.inner;

        let future = write_end
            .send_readdir_request(id, Cow::Borrowed(handle))?
            .wait();

        // Requests is already added to write buffer, so wakeup
        // the `flush_task` if necessary.
        owned_handle.get_auxiliary().wakeup_flush_task();

        Ok(future)
    }
}

impl Stream for ReadDir {
    type Item = Result<DirEntry, Error>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.project();
        let future = this.future;

        let entries = match &mut *this.entries {
            Some(entries) => entries,
            None => return Poll::Ready(None),
        };

        if entries.as_slice().is_empty() {
            let dir = &mut *this.dir;
            let cancellation_fut = this.cancellation_fut;

            let fut = match future {
                Some(future) => future,
                None => {
                    *future = Some(Self::new_request(dir)?);
                    future.as_mut().unwrap()
                }
            };

            let res = {
                let fut = async move {
                    tokio::select! {
                        biased;

                        _ = cancellation_fut => Err(cancel_error()),
                        res = fut => res,
                    }
                };

                tokio::pin!(fut);

                ready!(fut.poll(cx))
            };
            *future = None; // future is ready, reset it to None
            let (id, ret) = res?;

            this.dir.0.cache_id_mut(id);
            if ret.is_empty() {
                *this.entries = None;
                return Poll::Ready(None);
            } else {
                *entries = Vec::from(ret).into_iter();
            }
        }

        debug_assert!(future.is_none());
        debug_assert!(!entries.as_slice().is_empty());

        Poll::Ready(entries.next().map(DirEntry).map(Ok))
    }
}

impl FusedStream for ReadDir {
    fn is_terminated(&self) -> bool {
        self.entries.is_none()
    }
}

impl ReadDir {
    async fn do_drop(mut dir: Dir, future: Option<ResponseFuture>) {
        if let Some(future) = future {
            if let Ok((id, _)) = future.await {
                dir.0.cache_id_mut(id);
            }
        }

        if let Err(_err) = dir.close().await {
            #[cfg(feature = "tracing")]
            tracing::error!(?_err, "failed to close handle");
        }
    }
}

/// We need to keep polling the future stored internally, otherwise it would
/// drop the internal request ids too early, causing read task to fail
/// when they should not fail.
#[pinned_drop]
impl PinnedDrop for ReadDir {
    fn drop(self: Pin<&mut Self>) {
        let this = self.project();

        let dir = this.dir.clone();
        let future = this.future.take();

        let cancellation_fut = dir.0.get_auxiliary().cancel_token.clone().cancelled_owned();
        let do_drop_fut = Self::do_drop(dir, future);

        this.dir.0.get_auxiliary().tokio_handle().spawn(async move {
            tokio::select! {
                biased;

                _ = cancellation_fut => (),
                _ = do_drop_fut => (),
            }
        });
    }
}