use std::borrow::Cow;
use gix_features::progress::Progress;
use gix_transport::client;
use maybe_async::maybe_async;
use crate::{
credentials,
fetch::{Action, Arguments, Delegate, Error, Response},
indicate_end_of_interaction, Command,
};
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum FetchConnection {
#[default]
TerminateOnSuccessfulCompletion,
AllowReuse,
}
#[allow(clippy::result_large_err)]
#[maybe_async]
pub async fn fetch<F, D, T, P>(
mut transport: T,
mut delegate: D,
authenticate: F,
mut progress: P,
fetch_mode: FetchConnection,
agent: impl Into<String>,
) -> Result<(), Error>
where
F: FnMut(credentials::helper::Action) -> credentials::protocol::Result,
D: Delegate,
T: client::Transport,
P: Progress,
P::SubProgress: 'static,
{
let crate::handshake::Outcome {
server_protocol_version: protocol_version,
refs,
capabilities,
} = crate::fetch::handshake(
&mut transport,
authenticate,
delegate.handshake_extra_parameters(),
&mut progress,
)
.await?;
let agent = crate::agent(agent);
let refs = match refs {
Some(refs) => refs,
None => {
crate::ls_refs(
&mut transport,
&capabilities,
|a, b, c| {
let res = delegate.prepare_ls_refs(a, b, c);
c.push(("agent", Some(Cow::Owned(agent.clone()))));
res
},
&mut progress,
)
.await?
}
};
let fetch = Command::Fetch;
let mut fetch_features = fetch.default_features(protocol_version, &capabilities);
match delegate.prepare_fetch(protocol_version, &capabilities, &mut fetch_features, &refs) {
Ok(Action::Cancel) => {
return if matches!(protocol_version, gix_transport::Protocol::V1)
|| matches!(fetch_mode, FetchConnection::TerminateOnSuccessfulCompletion)
{
indicate_end_of_interaction(transport).await.map_err(Into::into)
} else {
Ok(())
};
}
Ok(Action::Continue) => {
fetch.validate_argument_prefixes_or_panic(protocol_version, &capabilities, &[], &fetch_features);
}
Err(err) => {
indicate_end_of_interaction(transport).await?;
return Err(err.into());
}
}
Response::check_required_features(protocol_version, &fetch_features)?;
let sideband_all = fetch_features.iter().any(|(n, _)| *n == "sideband-all");
fetch_features.push(("agent", Some(Cow::Owned(agent))));
let mut arguments = Arguments::new(protocol_version, fetch_features);
let mut previous_response = None::<Response>;
let mut round = 1;
'negotiation: loop {
progress.step();
progress.set_name(format!("negotiate (round {round})"));
round += 1;
let action = delegate.negotiate(&refs, &mut arguments, previous_response.as_ref())?;
let mut reader = arguments.send(&mut transport, action == Action::Cancel).await?;
if sideband_all {
setup_remote_progress(&mut progress, &mut reader);
}
let response = Response::from_line_reader(protocol_version, &mut reader).await?;
previous_response = if response.has_pack() {
progress.step();
progress.set_name("receiving pack");
if !sideband_all {
setup_remote_progress(&mut progress, &mut reader);
}
delegate.receive_pack(reader, progress, &refs, &response).await?;
break 'negotiation;
} else {
match action {
Action::Cancel => break 'negotiation,
Action::Continue => Some(response),
}
}
}
if matches!(protocol_version, gix_transport::Protocol::V2)
&& matches!(fetch_mode, FetchConnection::TerminateOnSuccessfulCompletion)
{
indicate_end_of_interaction(transport).await?;
}
Ok(())
}
fn setup_remote_progress<P>(progress: &mut P, reader: &mut Box<dyn gix_transport::client::ExtendedBufRead + Unpin + '_>)
where
P: Progress,
P::SubProgress: 'static,
{
reader.set_progress_handler(Some(Box::new({
let mut remote_progress = progress.add_child("remote");
move |is_err: bool, data: &[u8]| {
crate::RemoteProgress::translate_to_progress(is_err, data, &mut remote_progress);
gix_transport::packetline::read::ProgressAction::Continue
}
}) as gix_transport::client::HandleProgress));
}