1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
//! The implementation of creating an archive from a git tree, similar to `git archive`, but using an internal format.
//!
//! This crate can effectively be used to manipulate worktrees as streams of bytes, which can be decoded using the [`Stream`] type.
#![deny(rust_2018_idioms, missing_docs, unsafe_code)]

use std::{path::Path, sync::Arc};

use gix_object::bstr::BString;

/// A stream of entries that originate from a git tree and optionally from additional entries.
///
/// Note that a git tree is mandatory, but the empty tree can be used to effectively disable it.
pub struct Stream {
    read: utils::Read,
    err: SharedErrorSlot,
    extra_entries: Option<std::sync::mpsc::Sender<AdditionalEntry>>,
    // additional_entries: Vec,
    /// `None` if currently held by an entry.
    path_buf: Option<BString>,
    /// Another buffer to partially act like a buf-reader.
    buf: Vec<u8>,
    /// The offset into `buf` for entries being able to act like a buf reader.
    pos: usize,
    /// The amount of bytes usable from `buf` (even though it always has a fixed size)
    filled: usize,
}

///
#[allow(clippy::empty_docs)]
pub mod entry;
pub(crate) mod protocol;

mod from_tree;
pub use from_tree::from_tree;

pub(crate) type SharedErrorSlot = Arc<parking_lot::Mutex<Option<entry::Error>>>;

/// An entry in a stream. Note that they must be consumed fully, by reading from them till exhaustion.
///
/// ### Drop behaviour
///
/// If the entry is dropped without reading it till exhaustion, the stream is tainted and
/// [`next_entry()`][Stream::next_entry()] will panic next time it is called.
pub struct Entry<'a> {
    /// The kind of entry at [`relative_path`][Self::relative_path()].
    pub mode: gix_object::tree::EntryMode,
    /// The hash of the object, uniquely identifying it.
    pub id: gix_hash::ObjectId,
    /// Access to our parent
    parent: &'a mut Stream,
    /// The path relative to the repository at which data should be written.
    path_buf: Option<BString>,
    /// The amount of bytes left to read if the size of bytes to read is known.
    /// It's also our marker to say that we are depleted, which is important to signal to the
    /// parent stream that we can proceed reading the next entry.
    remaining: Option<usize>,
}

/// An entry that is [added to the stream][Stream::add_entry()] by the user, verbatim, without additional worktree conversions.
///
/// It may overwrite previously written paths, which may or may not work for the consumer of the stream.
pub struct AdditionalEntry {
    /// The hash of the object, uniquely identifying it.
    /// Note that it can be [`null()`][gix_hash::ObjectId::null()] as the hash is typically ignored by consumers of the stream.
    pub id: gix_hash::ObjectId,
    /// The kind of entry to create.
    pub mode: gix_object::tree::EntryMode,
    /// The path relative to the repository at which content should be located.
    pub relative_path: BString,
    /// Where to get the content of the entry from.
    pub source: entry::Source,
}

/// Lifecycle
impl Stream {
    /// Turn ourselves into the underlying byte stream which is a representation of the underlying git tree.
    ///
    /// Note that the format is unspecified, and its sole use is for transport, not for persistence.
    /// Can be used with [`Self::from_read()`] to decode the contained entries.
    pub fn into_read(self) -> impl std::io::Read {
        self.read
    }

    /// Return our internal byte stream from which entries would be generated.
    ///
    /// Note that the stream must then be consumed in its entirety.
    pub fn as_read_mut(&mut self) -> &mut impl std::io::Read {
        self.extra_entries.take();
        &mut self.read
    }

    /// Create a new instance from a stream of bytes in our format.
    ///
    /// It must have been created from [`Self::into_read()`] to be compatible, and must
    /// not have been persisted.
    pub fn from_read(read: impl std::io::Read + 'static) -> Self {
        Self {
            read: utils::Read::Unknown(Box::new(read)),
            extra_entries: None,
            path_buf: Some(Vec::with_capacity(1024).into()),
            err: Default::default(),
            buf: std::iter::repeat(0).take(u16::MAX as usize).collect(),
            pos: 0,
            filled: 0,
        }
    }
}

/// Entries
impl Stream {
    /// Add `entry` to the list of entries to be returned in calls to [`Self::next_entry()`].
    ///
    /// The entry will be returned after the one contained in the tree, in order of addition.
    /// # Panics
    /// If called after the first call to [`Self::next_entry()`].
    pub fn add_entry(&mut self, entry: AdditionalEntry) -> &mut Self {
        self.extra_entries
            .as_ref()
            .expect("BUG: must not add entries after the start of entries traversal")
            .send(entry)
            .expect("Failure is impossible as thread blocks on the receiving end");
        self
    }

    /// Add the item at `path` as entry to this stream, which is expected to be under `root`.
    ///
    /// Note that the created entries will always have a null SHA1, and that we access this path
    /// to determine its type, and will access it again when it is requested.
    pub fn add_entry_from_path(&mut self, root: &Path, path: &Path) -> std::io::Result<&mut Self> {
        let rela_path = path
            .strip_prefix(root)
            .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?;
        let meta = path.symlink_metadata()?;
        let relative_path = gix_path::to_unix_separators_on_windows(gix_path::into_bstr(rela_path)).into_owned();
        let id = gix_hash::ObjectId::null(gix_hash::Kind::Sha1);

        let entry = if meta.is_symlink() {
            let content = std::fs::read_link(path)?;
            let content = gix_path::into_bstr(content).into_owned();
            AdditionalEntry {
                id,
                mode: gix_object::tree::EntryKind::Link.into(),
                relative_path,
                source: entry::Source::Memory(content.into()),
            }
        } else if meta.is_dir() {
            AdditionalEntry {
                id,
                mode: gix_object::tree::EntryKind::Tree.into(),
                relative_path,
                source: entry::Source::Null,
            }
        } else {
            let mode = if gix_fs::is_executable(&meta) {
                gix_object::tree::EntryKind::BlobExecutable
            } else {
                gix_object::tree::EntryKind::Blob
            }
            .into();
            AdditionalEntry {
                id,
                mode,
                relative_path,
                source: entry::Source::Path(path.to_owned()),
            }
        };
        Ok(self.add_entry(entry))
    }
}

impl Stream {
    pub(crate) fn new() -> (
        Stream,
        gix_features::io::pipe::Writer,
        std::sync::mpsc::Receiver<AdditionalEntry>,
    ) {
        // 1 write for entry header and 1 for hash, 1 for entry path, + 1 for a buffer, then 32 of these.
        // Giving some buffer, at the expense of memory, is important to allow consumers to take off bytes more quickly,
        // otherwise, both threads effectively run in lock-step and nullify the benefit.
        let in_flight_writes = (2 + 1) * 32;
        let (write, read) = gix_features::io::pipe::unidirectional(in_flight_writes);
        let (tx_entries, rx_entries) = std::sync::mpsc::channel();
        (
            Stream {
                read: utils::Read::Known(read),
                extra_entries: Some(tx_entries),
                path_buf: Some(Vec::with_capacity(1024).into()),
                err: Default::default(),
                buf: std::iter::repeat(0).take(u16::MAX as usize).collect(),
                pos: 0,
                filled: 0,
            },
            write,
            rx_entries,
        )
    }
}

pub(crate) mod utils {
    pub enum Read {
        Known(gix_features::io::pipe::Reader),
        Unknown(Box<dyn std::io::Read>),
    }

    impl std::io::Read for Read {
        fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
            match self {
                Read::Known(r) => r.read(buf),
                Read::Unknown(r) => r.read(buf),
            }
        }
    }
}