taskcluster_upload/
factory.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
use anyhow::Result;
use async_trait::async_trait;
use std::io::{Cursor, SeekFrom};
use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncSeekExt};

/// An AsyncReaderFactory can produce, on demand, an [AsyncRead] object.  In the event of an upload
/// failure, the restarted upload will use a fresh reader to start reading object content at the
/// beginning.
#[async_trait]
pub trait AsyncReaderFactory {
    /// Get a fresh [AsyncRead] object, positioned at the beginning of the data to be uploaded.
    async fn get_reader<'a>(
        &'a mut self,
    ) -> Result<Box<dyn AsyncRead + Sync + Send + Unpin + 'static>>;
}

/// A CusorReaderFactory creates [AsyncRead] objects from a [std::io::Cursor], allowing uploads
/// from in-memory buffers.  Note that this struct clones the given data for each retry, although
/// this behavior may be optimized in the future.
pub struct CursorReaderFactory(Vec<u8>);

#[async_trait]
impl AsyncReaderFactory for CursorReaderFactory {
    async fn get_reader(&mut self) -> Result<Box<dyn AsyncRead + Sync + Send + Unpin + 'static>> {
        Ok(Box::new(Cursor::new(self.0.clone())))
    }
}

impl CursorReaderFactory {
    pub fn new(buf: &[u8]) -> Self {
        Self(buf.to_vec())
    }
}

/// A FileReaderFactory creates [AsyncRead] objects by rewinding and cloning a file.  The given
/// file must be clonable (that is, [File::try_clone()] must succeed).
pub struct FileReaderFactory(File);

#[async_trait]
impl AsyncReaderFactory for FileReaderFactory {
    async fn get_reader<'a>(
        &'a mut self,
    ) -> Result<Box<dyn AsyncRead + Sync + Send + Unpin + 'static>> {
        let mut file = self.0.try_clone().await?;
        file.seek(SeekFrom::Start(0)).await?;
        Ok(Box::new(file))
    }
}

impl FileReaderFactory {
    pub fn new(file: File) -> Self {
        Self(file)
    }
}

#[cfg(test)]
mod test {
    use super::*;
    use anyhow::Result;
    use tempfile::tempfile;
    use tokio::io::{copy, AsyncWriteExt};

    const DATA: &[u8] = b"HELLO/WORLD";

    async fn copy_from_factory<F: AsyncReaderFactory>(factory: &mut F) -> std::io::Result<Vec<u8>> {
        let mut reader = factory.get_reader().await.unwrap();
        let mut writer = Cursor::new(Vec::new());
        copy(&mut reader, &mut writer).await?;
        Ok(writer.into_inner())
    }

    #[tokio::test]
    async fn cursor_reader_twice() -> Result<()> {
        let mut factory = CursorReaderFactory::new(DATA);
        assert_eq!(&copy_from_factory(&mut factory).await?, DATA);
        assert_eq!(&copy_from_factory(&mut factory).await?, DATA);
        Ok(())
    }

    #[tokio::test]
    async fn file_reader_twice() -> Result<()> {
        let mut file: File = tempfile()?.into();

        file.write_all(DATA).await?;
        // This file will be cloned before it is read by the factory, so flush
        // any buffered data before doing so. Tokio internally buffers the write
        // and completes it in another thread.
        file.flush().await?;

        let mut factory = FileReaderFactory::new(file);
        assert_eq!(&copy_from_factory(&mut factory).await?, DATA);
        assert_eq!(&copy_from_factory(&mut factory).await?, DATA);
        Ok(())
    }
}