taskcluster_upload/
factory.rs

1use anyhow::Result;
2use async_trait::async_trait;
3use std::io::{Cursor, SeekFrom};
4use tokio::fs::File;
5use tokio::io::{AsyncRead, AsyncSeekExt};
6
7/// An AsyncReaderFactory can produce, on demand, an [AsyncRead] object.  In the event of an upload
8/// failure, the restarted upload will use a fresh reader to start reading object content at the
9/// beginning.
10#[async_trait]
11pub trait AsyncReaderFactory {
12    /// Get a fresh [AsyncRead] object, positioned at the beginning of the data to be uploaded.
13    async fn get_reader<'a>(
14        &'a mut self,
15    ) -> Result<Box<dyn AsyncRead + Sync + Send + Unpin + 'static>>;
16}
17
18/// A CusorReaderFactory creates [AsyncRead] objects from a [std::io::Cursor], allowing uploads
19/// from in-memory buffers.  Note that this struct clones the given data for each retry, although
20/// this behavior may be optimized in the future.
21pub struct CursorReaderFactory(Vec<u8>);
22
23#[async_trait]
24impl AsyncReaderFactory for CursorReaderFactory {
25    async fn get_reader(&mut self) -> Result<Box<dyn AsyncRead + Sync + Send + Unpin + 'static>> {
26        Ok(Box::new(Cursor::new(self.0.clone())))
27    }
28}
29
30impl CursorReaderFactory {
31    pub fn new(buf: &[u8]) -> Self {
32        Self(buf.to_vec())
33    }
34}
35
36/// A FileReaderFactory creates [AsyncRead] objects by rewinding and cloning a file.  The given
37/// file must be clonable (that is, [File::try_clone()] must succeed).
38pub struct FileReaderFactory(File);
39
40#[async_trait]
41impl AsyncReaderFactory for FileReaderFactory {
42    async fn get_reader<'a>(
43        &'a mut self,
44    ) -> Result<Box<dyn AsyncRead + Sync + Send + Unpin + 'static>> {
45        let mut file = self.0.try_clone().await?;
46        file.seek(SeekFrom::Start(0)).await?;
47        Ok(Box::new(file))
48    }
49}
50
51impl FileReaderFactory {
52    pub fn new(file: File) -> Self {
53        Self(file)
54    }
55}
56
57#[cfg(test)]
58mod test {
59    use super::*;
60    use anyhow::Result;
61    use tempfile::tempfile;
62    use tokio::io::{copy, AsyncWriteExt};
63
64    const DATA: &[u8] = b"HELLO/WORLD";
65
66    async fn copy_from_factory<F: AsyncReaderFactory>(factory: &mut F) -> std::io::Result<Vec<u8>> {
67        let mut reader = factory.get_reader().await.unwrap();
68        let mut writer = Cursor::new(Vec::new());
69        copy(&mut reader, &mut writer).await?;
70        Ok(writer.into_inner())
71    }
72
73    #[tokio::test]
74    async fn cursor_reader_twice() -> Result<()> {
75        let mut factory = CursorReaderFactory::new(DATA);
76        assert_eq!(&copy_from_factory(&mut factory).await?, DATA);
77        assert_eq!(&copy_from_factory(&mut factory).await?, DATA);
78        Ok(())
79    }
80
81    #[tokio::test]
82    async fn file_reader_twice() -> Result<()> {
83        let mut file: File = tempfile()?.into();
84
85        file.write_all(DATA).await?;
86        // This file will be cloned before it is read by the factory, so flush
87        // any buffered data before doing so. Tokio internally buffers the write
88        // and completes it in another thread.
89        file.flush().await?;
90
91        let mut factory = FileReaderFactory::new(file);
92        assert_eq!(&copy_from_factory(&mut factory).await?, DATA);
93        assert_eq!(&copy_from_factory(&mut factory).await?, DATA);
94        Ok(())
95    }
96}