gix_protocol/fetch/
function.rs

1use crate::fetch::{
2    negotiate, Context, Error, Negotiate, NegotiateOutcome, Options, Outcome, ProgressId, Shallow, Tags,
3};
4use crate::{fetch::Arguments, transport::packetline::read::ProgressAction};
5use gix_features::progress::DynNestedProgress;
6use std::path::Path;
7use std::sync::atomic::{AtomicBool, Ordering};
8
9/// Perform one fetch operation, relying on a `transport`.
10/// `negotiate` is used to run the negotiation of objects that should be contained in the pack, *if* one is to be received.
11/// `progress` and `should_interrupt` is passed to all potentially long-running parts of the operation.
12///
13/// `consume_pack(pack_read, progress, interrupt) -> bool` is always called to consume all bytes that are sent by the server, returning `true` if we should assure the pack is read to the end,
14/// or `false` to do nothing. Dropping the reader without reading to EOF (i.e. returning `false`) is an offense to the server, and
15/// `transport` won't be in the correct state to perform additional operations, or indicate the end of operation.
16/// Note that the passed reader blocking as the pack-writing is blocking as well.
17///
18/// The `Context` and `Options` further define parts of this `fetch` operation.
19///
20/// As opposed to a full `git fetch`, this operation does *not*…
21///
22/// * …update local refs
23/// * …end the interaction after the fetch
24///
25/// **Note that the interaction will never be ended**, even on error or failure, leaving it up to the caller to do that, maybe
26/// with the help of [`SendFlushOnDrop`](crate::SendFlushOnDrop) which can wrap `transport`.
27/// Generally, the `transport` is left in a state that allows for more commands to be run.
28///
29/// Return `Ok(None)` if there was nothing to do because all remote refs are at the same state as they are locally,
30/// or there was nothing wanted, or `Ok(Some(outcome))` to inform about all the changes that were made.
31#[maybe_async::maybe_async]
32pub async fn fetch<P, T, E>(
33    negotiate: &mut impl Negotiate,
34    consume_pack: impl FnOnce(&mut dyn std::io::BufRead, &mut dyn DynNestedProgress, &AtomicBool) -> Result<bool, E>,
35    mut progress: P,
36    should_interrupt: &AtomicBool,
37    Context {
38        handshake,
39        transport,
40        user_agent,
41        trace_packetlines,
42    }: Context<'_, T>,
43    Options {
44        shallow_file,
45        shallow,
46        tags,
47        reject_shallow_remote,
48    }: Options<'_>,
49) -> Result<Option<Outcome>, Error>
50where
51    P: gix_features::progress::NestedProgress,
52    P::SubProgress: 'static,
53    T: gix_transport::client::Transport,
54    E: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
55{
56    let _span = gix_trace::coarse!("gix_protocol::fetch()");
57    let v1_shallow_updates = handshake.v1_shallow_updates.take();
58    let protocol_version = handshake.server_protocol_version;
59
60    let fetch = crate::Command::Fetch;
61    let fetch_features = {
62        let mut f = fetch.default_features(protocol_version, &handshake.capabilities);
63        f.push(user_agent);
64        f
65    };
66
67    crate::fetch::Response::check_required_features(protocol_version, &fetch_features)?;
68    let sideband_all = fetch_features.iter().any(|(n, _)| *n == "sideband-all");
69    let mut arguments = Arguments::new(protocol_version, fetch_features, trace_packetlines);
70    if matches!(tags, Tags::Included) {
71        if !arguments.can_use_include_tag() {
72            return Err(Error::MissingServerFeature {
73                    feature: "include-tag",
74                    description:
75                    // NOTE: if this is an issue, we could probably do what's proposed here.
76                    "To make this work we would have to implement another pass to fetch attached tags separately",
77                });
78        }
79        arguments.use_include_tag();
80    }
81    let (shallow_commits, mut shallow_lock) = add_shallow_args(&mut arguments, shallow, &shallow_file)?;
82
83    let negotiate_span = gix_trace::detail!(
84        "negotiate",
85        protocol_version = handshake.server_protocol_version as usize
86    );
87    let action = negotiate.mark_complete_and_common_ref()?;
88    let mut previous_response = None::<crate::fetch::Response>;
89    match &action {
90        negotiate::Action::NoChange | negotiate::Action::SkipToRefUpdate => Ok(None),
91        negotiate::Action::MustNegotiate {
92            remote_ref_target_known,
93        } => {
94            if !negotiate.add_wants(&mut arguments, remote_ref_target_known) {
95                return Ok(None);
96            }
97            let mut rounds = Vec::new();
98            let is_stateless = arguments.is_stateless(!transport.connection_persists_across_multiple_requests());
99            let mut state = negotiate::one_round::State::new(is_stateless);
100            let mut reader = 'negotiation: loop {
101                let _round = gix_trace::detail!("negotiate round", round = rounds.len() + 1);
102                progress.step();
103                progress.set_name(format!("negotiate (round {})", rounds.len() + 1));
104                if should_interrupt.load(Ordering::Relaxed) {
105                    return Err(Error::Negotiate(negotiate::Error::NegotiationFailed {
106                        rounds: rounds.len(),
107                    }));
108                }
109
110                let is_done = match negotiate.one_round(&mut state, &mut arguments, previous_response.as_ref()) {
111                    Ok((round, is_done)) => {
112                        rounds.push(round);
113                        is_done
114                    }
115                    Err(err) => {
116                        return Err(err.into());
117                    }
118                };
119                let mut reader = arguments.send(transport, is_done).await?;
120                if sideband_all {
121                    setup_remote_progress(&mut progress, &mut reader, should_interrupt);
122                }
123                let response =
124                    crate::fetch::Response::from_line_reader(protocol_version, &mut reader, is_done, !is_done).await?;
125                let has_pack = response.has_pack();
126                previous_response = Some(response);
127                if has_pack {
128                    progress.step();
129                    progress.set_name("receiving pack".into());
130                    if !sideband_all {
131                        setup_remote_progress(&mut progress, &mut reader, should_interrupt);
132                    }
133                    break 'negotiation reader;
134                }
135            };
136            drop(negotiate_span);
137
138            let mut previous_response = previous_response.expect("knowledge of a pack means a response was received");
139            previous_response.append_v1_shallow_updates(v1_shallow_updates);
140            if !previous_response.shallow_updates().is_empty() && shallow_lock.is_none() {
141                if reject_shallow_remote {
142                    return Err(Error::RejectShallowRemote);
143                }
144                shallow_lock = acquire_shallow_lock(&shallow_file).map(Some)?;
145            }
146
147            #[cfg(feature = "async-client")]
148            let mut rd = crate::futures_lite::io::BlockOn::new(reader);
149            #[cfg(not(feature = "async-client"))]
150            let mut rd = reader;
151            let may_read_to_end =
152                consume_pack(&mut rd, &mut progress, should_interrupt).map_err(|err| Error::ConsumePack(err.into()))?;
153            #[cfg(feature = "async-client")]
154            {
155                reader = rd.into_inner();
156            }
157            #[cfg(not(feature = "async-client"))]
158            {
159                reader = rd;
160            }
161
162            if may_read_to_end {
163                // Assure the final flush packet is consumed.
164                let has_read_to_end = reader.stopped_at().is_some();
165                #[cfg(feature = "async-client")]
166                {
167                    if !has_read_to_end {
168                        futures_lite::io::copy(&mut reader, &mut futures_lite::io::sink())
169                            .await
170                            .map_err(Error::ReadRemainingBytes)?;
171                    }
172                }
173                #[cfg(not(feature = "async-client"))]
174                {
175                    if !has_read_to_end {
176                        std::io::copy(&mut reader, &mut std::io::sink()).map_err(Error::ReadRemainingBytes)?;
177                    }
178                }
179            }
180            drop(reader);
181
182            if let Some(shallow_lock) = shallow_lock {
183                if !previous_response.shallow_updates().is_empty() {
184                    gix_shallow::write(shallow_lock, shallow_commits, previous_response.shallow_updates())?;
185                }
186            }
187            Ok(Some(Outcome {
188                last_response: previous_response,
189                negotiate: NegotiateOutcome { action, rounds },
190            }))
191        }
192    }
193}
194
195fn acquire_shallow_lock(shallow_file: &Path) -> Result<gix_lock::File, Error> {
196    gix_lock::File::acquire_to_update_resource(shallow_file, gix_lock::acquire::Fail::Immediately, None)
197        .map_err(Into::into)
198}
199
200fn add_shallow_args(
201    args: &mut Arguments,
202    shallow: &Shallow,
203    shallow_file: &std::path::Path,
204) -> Result<(Option<Vec<gix_hash::ObjectId>>, Option<gix_lock::File>), Error> {
205    let expect_change = *shallow != Shallow::NoChange;
206    let shallow_lock = expect_change.then(|| acquire_shallow_lock(shallow_file)).transpose()?;
207
208    let shallow_commits = gix_shallow::read(shallow_file)?;
209    if (shallow_commits.is_some() || expect_change) && !args.can_use_shallow() {
210        // NOTE: if this is an issue, we can always unshallow the repo ourselves.
211        return Err(Error::MissingServerFeature {
212            feature: "shallow",
213            description: "shallow clones need server support to remain shallow, otherwise bigger than expected packs are sent effectively unshallowing the repository",
214        });
215    }
216    if let Some(shallow_commits) = &shallow_commits {
217        for commit in shallow_commits.iter() {
218            args.shallow(commit);
219        }
220    }
221    match shallow {
222        Shallow::NoChange => {}
223        Shallow::DepthAtRemote(commits) => args.deepen(commits.get() as usize),
224        Shallow::Deepen(commits) => {
225            args.deepen(*commits as usize);
226            args.deepen_relative();
227        }
228        Shallow::Since { cutoff } => {
229            args.deepen_since(cutoff.seconds);
230        }
231        Shallow::Exclude {
232            remote_refs,
233            since_cutoff,
234        } => {
235            if let Some(cutoff) = since_cutoff {
236                args.deepen_since(cutoff.seconds);
237            }
238            for ref_ in remote_refs {
239                args.deepen_not(ref_.as_ref().as_bstr());
240            }
241        }
242    }
243    Ok((shallow_commits, shallow_lock))
244}
245
246fn setup_remote_progress<'a>(
247    progress: &mut dyn gix_features::progress::DynNestedProgress,
248    reader: &mut Box<dyn crate::transport::client::ExtendedBufRead<'a> + Unpin + 'a>,
249    should_interrupt: &'a AtomicBool,
250) {
251    use crate::transport::client::ExtendedBufRead;
252    reader.set_progress_handler(Some(Box::new({
253        let mut remote_progress = progress.add_child_with_id("remote".to_string(), ProgressId::RemoteProgress.into());
254        move |is_err: bool, data: &[u8]| {
255            crate::RemoteProgress::translate_to_progress(is_err, data, &mut remote_progress);
256            if should_interrupt.load(Ordering::Relaxed) {
257                ProgressAction::Interrupt
258            } else {
259                ProgressAction::Continue
260            }
261        }
262    }) as crate::transport::client::HandleProgress<'a>));
263}