1use crate::fetch::{
2 negotiate, Context, Error, Negotiate, NegotiateOutcome, Options, Outcome, ProgressId, Shallow, Tags,
3};
4use crate::{fetch::Arguments, transport::packetline::read::ProgressAction};
5use gix_features::progress::DynNestedProgress;
6use std::path::Path;
7use std::sync::atomic::{AtomicBool, Ordering};
8
9#[maybe_async::maybe_async]
32pub async fn fetch<P, T, E>(
33 negotiate: &mut impl Negotiate,
34 consume_pack: impl FnOnce(&mut dyn std::io::BufRead, &mut dyn DynNestedProgress, &AtomicBool) -> Result<bool, E>,
35 mut progress: P,
36 should_interrupt: &AtomicBool,
37 Context {
38 handshake,
39 transport,
40 user_agent,
41 trace_packetlines,
42 }: Context<'_, T>,
43 Options {
44 shallow_file,
45 shallow,
46 tags,
47 reject_shallow_remote,
48 }: Options<'_>,
49) -> Result<Option<Outcome>, Error>
50where
51 P: gix_features::progress::NestedProgress,
52 P::SubProgress: 'static,
53 T: gix_transport::client::Transport,
54 E: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
55{
56 let _span = gix_trace::coarse!("gix_protocol::fetch()");
57 let v1_shallow_updates = handshake.v1_shallow_updates.take();
58 let protocol_version = handshake.server_protocol_version;
59
60 let fetch = crate::Command::Fetch;
61 let fetch_features = {
62 let mut f = fetch.default_features(protocol_version, &handshake.capabilities);
63 f.push(user_agent);
64 f
65 };
66
67 crate::fetch::Response::check_required_features(protocol_version, &fetch_features)?;
68 let sideband_all = fetch_features.iter().any(|(n, _)| *n == "sideband-all");
69 let mut arguments = Arguments::new(protocol_version, fetch_features, trace_packetlines);
70 if matches!(tags, Tags::Included) {
71 if !arguments.can_use_include_tag() {
72 return Err(Error::MissingServerFeature {
73 feature: "include-tag",
74 description:
75 "To make this work we would have to implement another pass to fetch attached tags separately",
77 });
78 }
79 arguments.use_include_tag();
80 }
81 let (shallow_commits, mut shallow_lock) = add_shallow_args(&mut arguments, shallow, &shallow_file)?;
82
83 let negotiate_span = gix_trace::detail!(
84 "negotiate",
85 protocol_version = handshake.server_protocol_version as usize
86 );
87 let action = negotiate.mark_complete_and_common_ref()?;
88 let mut previous_response = None::<crate::fetch::Response>;
89 match &action {
90 negotiate::Action::NoChange | negotiate::Action::SkipToRefUpdate => Ok(None),
91 negotiate::Action::MustNegotiate {
92 remote_ref_target_known,
93 } => {
94 if !negotiate.add_wants(&mut arguments, remote_ref_target_known) {
95 return Ok(None);
96 }
97 let mut rounds = Vec::new();
98 let is_stateless = arguments.is_stateless(!transport.connection_persists_across_multiple_requests());
99 let mut state = negotiate::one_round::State::new(is_stateless);
100 let mut reader = 'negotiation: loop {
101 let _round = gix_trace::detail!("negotiate round", round = rounds.len() + 1);
102 progress.step();
103 progress.set_name(format!("negotiate (round {})", rounds.len() + 1));
104 if should_interrupt.load(Ordering::Relaxed) {
105 return Err(Error::Negotiate(negotiate::Error::NegotiationFailed {
106 rounds: rounds.len(),
107 }));
108 }
109
110 let is_done = match negotiate.one_round(&mut state, &mut arguments, previous_response.as_ref()) {
111 Ok((round, is_done)) => {
112 rounds.push(round);
113 is_done
114 }
115 Err(err) => {
116 return Err(err.into());
117 }
118 };
119 let mut reader = arguments.send(transport, is_done).await?;
120 if sideband_all {
121 setup_remote_progress(&mut progress, &mut reader, should_interrupt);
122 }
123 let response =
124 crate::fetch::Response::from_line_reader(protocol_version, &mut reader, is_done, !is_done).await?;
125 let has_pack = response.has_pack();
126 previous_response = Some(response);
127 if has_pack {
128 progress.step();
129 progress.set_name("receiving pack".into());
130 if !sideband_all {
131 setup_remote_progress(&mut progress, &mut reader, should_interrupt);
132 }
133 break 'negotiation reader;
134 }
135 };
136 drop(negotiate_span);
137
138 let mut previous_response = previous_response.expect("knowledge of a pack means a response was received");
139 previous_response.append_v1_shallow_updates(v1_shallow_updates);
140 if !previous_response.shallow_updates().is_empty() && shallow_lock.is_none() {
141 if reject_shallow_remote {
142 return Err(Error::RejectShallowRemote);
143 }
144 shallow_lock = acquire_shallow_lock(&shallow_file).map(Some)?;
145 }
146
147 #[cfg(feature = "async-client")]
148 let mut rd = crate::futures_lite::io::BlockOn::new(reader);
149 #[cfg(not(feature = "async-client"))]
150 let mut rd = reader;
151 let may_read_to_end =
152 consume_pack(&mut rd, &mut progress, should_interrupt).map_err(|err| Error::ConsumePack(err.into()))?;
153 #[cfg(feature = "async-client")]
154 {
155 reader = rd.into_inner();
156 }
157 #[cfg(not(feature = "async-client"))]
158 {
159 reader = rd;
160 }
161
162 if may_read_to_end {
163 let has_read_to_end = reader.stopped_at().is_some();
165 #[cfg(feature = "async-client")]
166 {
167 if !has_read_to_end {
168 futures_lite::io::copy(&mut reader, &mut futures_lite::io::sink())
169 .await
170 .map_err(Error::ReadRemainingBytes)?;
171 }
172 }
173 #[cfg(not(feature = "async-client"))]
174 {
175 if !has_read_to_end {
176 std::io::copy(&mut reader, &mut std::io::sink()).map_err(Error::ReadRemainingBytes)?;
177 }
178 }
179 }
180 drop(reader);
181
182 if let Some(shallow_lock) = shallow_lock {
183 if !previous_response.shallow_updates().is_empty() {
184 gix_shallow::write(shallow_lock, shallow_commits, previous_response.shallow_updates())?;
185 }
186 }
187 Ok(Some(Outcome {
188 last_response: previous_response,
189 negotiate: NegotiateOutcome { action, rounds },
190 }))
191 }
192 }
193}
194
195fn acquire_shallow_lock(shallow_file: &Path) -> Result<gix_lock::File, Error> {
196 gix_lock::File::acquire_to_update_resource(shallow_file, gix_lock::acquire::Fail::Immediately, None)
197 .map_err(Into::into)
198}
199
200fn add_shallow_args(
201 args: &mut Arguments,
202 shallow: &Shallow,
203 shallow_file: &std::path::Path,
204) -> Result<(Option<Vec<gix_hash::ObjectId>>, Option<gix_lock::File>), Error> {
205 let expect_change = *shallow != Shallow::NoChange;
206 let shallow_lock = expect_change.then(|| acquire_shallow_lock(shallow_file)).transpose()?;
207
208 let shallow_commits = gix_shallow::read(shallow_file)?;
209 if (shallow_commits.is_some() || expect_change) && !args.can_use_shallow() {
210 return Err(Error::MissingServerFeature {
212 feature: "shallow",
213 description: "shallow clones need server support to remain shallow, otherwise bigger than expected packs are sent effectively unshallowing the repository",
214 });
215 }
216 if let Some(shallow_commits) = &shallow_commits {
217 for commit in shallow_commits.iter() {
218 args.shallow(commit);
219 }
220 }
221 match shallow {
222 Shallow::NoChange => {}
223 Shallow::DepthAtRemote(commits) => args.deepen(commits.get() as usize),
224 Shallow::Deepen(commits) => {
225 args.deepen(*commits as usize);
226 args.deepen_relative();
227 }
228 Shallow::Since { cutoff } => {
229 args.deepen_since(cutoff.seconds);
230 }
231 Shallow::Exclude {
232 remote_refs,
233 since_cutoff,
234 } => {
235 if let Some(cutoff) = since_cutoff {
236 args.deepen_since(cutoff.seconds);
237 }
238 for ref_ in remote_refs {
239 args.deepen_not(ref_.as_ref().as_bstr());
240 }
241 }
242 }
243 Ok((shallow_commits, shallow_lock))
244}
245
246fn setup_remote_progress<'a>(
247 progress: &mut dyn gix_features::progress::DynNestedProgress,
248 reader: &mut Box<dyn crate::transport::client::ExtendedBufRead<'a> + Unpin + 'a>,
249 should_interrupt: &'a AtomicBool,
250) {
251 use crate::transport::client::ExtendedBufRead;
252 reader.set_progress_handler(Some(Box::new({
253 let mut remote_progress = progress.add_child_with_id("remote".to_string(), ProgressId::RemoteProgress.into());
254 move |is_err: bool, data: &[u8]| {
255 crate::RemoteProgress::translate_to_progress(is_err, data, &mut remote_progress);
256 if should_interrupt.load(Ordering::Relaxed) {
257 ProgressAction::Interrupt
258 } else {
259 ProgressAction::Continue
260 }
261 }
262 }) as crate::transport::client::HandleProgress<'a>));
263}