1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
//! Stream adapter for file system
use std::fs::File;
use std::io::Read;
use bytes::Bytes;

use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
use std::pin::Pin;
use std::error::Error;
use crate::{
    safe_eject,
    errors::error::{
        SurrealError,
        SurrealErrorStatus
    }
};


/// Stream adapter for file system.
/// 
/// # Arguments
/// * `chunk_size` - The size of the chunks to read from the file.
/// * `file_pointer` - The pointer to the file to be streamed
pub struct StreamAdapter {
    chunk_size: usize,
    file_pointer: File
}

impl StreamAdapter {

    /// Creates a new `StreamAdapter` struct.
    /// 
    /// # Arguments
    /// * `chunk_size` - The size of the chunks to read from the file.
    /// * `file_path` - The path to the file to be streamed
    /// 
    /// # Returns
    /// A new `StreamAdapter` struct.
    pub fn new(chunk_size: usize, file_path: String) -> Result<Self, SurrealError> {
        let file_pointer = safe_eject!(File::open(file_path), SurrealErrorStatus::NotFound);
        Ok(StreamAdapter {
            chunk_size,
            file_pointer
        })
    }

}

impl Stream for StreamAdapter {

    type Item = Result<Bytes, Box<dyn Error + Send + Sync>>;

    /// Polls the next chunk from the file.
    /// 
    /// # Arguments
    /// * `self` - The `StreamAdapter` struct.
    /// * `cx` - The context of the task to enable the task to be woken up and polled again using the waker.
    /// 
    /// # Returns
    /// A poll containing the next chunk from the file.
    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut buffer = vec![0u8; self.chunk_size];
        let bytes_read = self.file_pointer.read(&mut buffer)?;

        buffer.truncate(bytes_read);
        if buffer.is_empty() {
            return Poll::Ready(None);
        }
        Poll::Ready(Some(Ok(buffer.into())))
    }
}