openssh_sftp_client/fs/
dir.rsuse crate::{
cancel_error,
lowlevel::NameEntry,
metadata::{FileType, MetaData},
Error,
};
use super::Dir;
use std::{
borrow::Cow,
future::Future,
path::Path,
pin::Pin,
task::{ready, Context, Poll},
vec::IntoIter,
};
use futures_core::stream::{FusedStream, Stream};
use pin_project::{pin_project, pinned_drop};
use tokio_util::sync::WaitForCancellationFutureOwned;
type ResponseFuture = crate::lowlevel::AwaitableNameEntriesFuture<crate::Buffer>;
#[repr(transparent)]
#[derive(Debug, Clone)]
pub struct DirEntry(NameEntry);
impl DirEntry {
pub fn filename(&self) -> &Path {
&self.0.filename
}
pub fn filename_mut(&mut self) -> &mut Box<Path> {
&mut self.0.filename
}
pub fn metadata(&self) -> MetaData {
MetaData::new(self.0.attrs)
}
pub fn file_type(&self) -> Option<FileType> {
self.metadata().file_type()
}
}
#[derive(Debug)]
#[pin_project(PinnedDrop)]
pub struct ReadDir {
dir: Dir,
future: Option<ResponseFuture>,
entries: Option<IntoIter<NameEntry>>,
#[pin]
cancellation_fut: WaitForCancellationFutureOwned,
}
impl ReadDir {
pub(super) fn new(dir: Dir) -> Self {
Self {
cancellation_fut: dir.0.get_auxiliary().cancel_token.clone().cancelled_owned(),
dir,
future: None,
entries: Some(Vec::new().into_iter()),
}
}
fn new_request(dir: &mut Dir) -> Result<ResponseFuture, Error> {
let owned_handle = &mut dir.0;
let id = owned_handle.get_id_mut();
let handle = &owned_handle.handle;
let write_end = &mut owned_handle.write_end.inner;
let future = write_end
.send_readdir_request(id, Cow::Borrowed(handle))?
.wait();
owned_handle.get_auxiliary().wakeup_flush_task();
Ok(future)
}
}
impl Stream for ReadDir {
type Item = Result<DirEntry, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let future = this.future;
let entries = match &mut *this.entries {
Some(entries) => entries,
None => return Poll::Ready(None),
};
if entries.as_slice().is_empty() {
let dir = &mut *this.dir;
let cancellation_fut = this.cancellation_fut;
let fut = match future {
Some(future) => future,
None => {
*future = Some(Self::new_request(dir)?);
future.as_mut().unwrap()
}
};
let res = {
let fut = async move {
tokio::select! {
biased;
_ = cancellation_fut => Err(cancel_error()),
res = fut => res,
}
};
tokio::pin!(fut);
ready!(fut.poll(cx))
};
*future = None; let (id, ret) = res?;
this.dir.0.cache_id_mut(id);
if ret.is_empty() {
*this.entries = None;
return Poll::Ready(None);
} else {
*entries = Vec::from(ret).into_iter();
}
}
debug_assert!(future.is_none());
debug_assert!(!entries.as_slice().is_empty());
Poll::Ready(entries.next().map(DirEntry).map(Ok))
}
}
impl FusedStream for ReadDir {
fn is_terminated(&self) -> bool {
self.entries.is_none()
}
}
impl ReadDir {
async fn do_drop(mut dir: Dir, future: Option<ResponseFuture>) {
if let Some(future) = future {
if let Ok((id, _)) = future.await {
dir.0.cache_id_mut(id);
}
}
if let Err(_err) = dir.close().await {
#[cfg(feature = "tracing")]
tracing::error!(?_err, "failed to close handle");
}
}
}
#[pinned_drop]
impl PinnedDrop for ReadDir {
fn drop(self: Pin<&mut Self>) {
let this = self.project();
let dir = this.dir.clone();
let future = this.future.take();
let cancellation_fut = dir.0.get_auxiliary().cancel_token.clone().cancelled_owned();
let do_drop_fut = Self::do_drop(dir, future);
this.dir.0.get_auxiliary().tokio_handle().spawn(async move {
tokio::select! {
biased;
_ = cancellation_fut => (),
_ = do_drop_fut => (),
}
});
}
}