gix_protocol/
fetch_fn.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
use std::borrow::Cow;

use gix_features::progress::NestedProgress;
use gix_transport::client;
use maybe_async::maybe_async;

use crate::{
    credentials,
    fetch::{Action, Arguments, Delegate, Error, Response},
    indicate_end_of_interaction, Command,
};

/// A way to indicate how to treat the connection underlying the transport, potentially allowing to reuse it.
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum FetchConnection {
    /// Use this variant if server should be informed that the operation is completed and no further commands will be issued
    /// at the end of the fetch operation or after deciding that no fetch operation should happen after references were listed.
    ///
    /// When indicating the end-of-fetch, this flag is only relevant in protocol V2.
    /// Generally it only applies when using persistent transports.
    ///
    /// In most explicit client side failure modes the end-of-operation' notification will be sent to the server automatically.
    #[default]
    TerminateOnSuccessfulCompletion,

    /// Indicate that persistent transport connections can be reused by _not_ sending an 'end-of-operation' notification to the server.
    /// This is useful if multiple `fetch(…)` calls are used in succession.
    ///
    /// Note that this has no effect in case of non-persistent connections, like the ones over HTTP.
    ///
    /// As an optimization, callers can use `AllowReuse` here as the server will also know the client is done
    /// if the connection is closed.
    AllowReuse,
}

/// Perform a 'fetch' operation with the server using `transport`, with `delegate` handling all server interactions.
/// **Note** that `delegate` has blocking operations and thus this entire call should be on an executor which can handle
/// that. This could be the current thread blocking, or another thread.
///
/// * `authenticate(operation_to_perform)` is used to receive credentials for the connection and potentially store it
///   if the server indicates 'permission denied'. Note that not all transport support authentication or authorization.
/// * `progress` is used to emit progress messages.
/// * `name` is the name of the git client to present as `agent`, like `"my-app (v2.0)"`".
/// * If `trace` is `true`, all packetlines received or sent will be passed to the facilities of the `gix-trace` crate.
///
/// _Note_ that depending on the `delegate`, the actual action performed can be `ls-refs`, `clone` or `fetch`.
///
/// # WARNING - Do not use!
///
/// As it will hang when having multiple negotiation rounds.
#[allow(clippy::result_large_err)]
#[maybe_async]
// TODO: remove this without losing test coverage - we have the same but better in `gix` and it's
//       not really worth it to maintain the delegates here.
pub async fn fetch<F, D, T, P>(
    mut transport: T,
    mut delegate: D,
    authenticate: F,
    mut progress: P,
    fetch_mode: FetchConnection,
    agent: impl Into<String>,
    trace: bool,
) -> Result<(), Error>
where
    F: FnMut(credentials::helper::Action) -> credentials::protocol::Result,
    D: Delegate,
    T: client::Transport,
    P: NestedProgress + 'static,
    P::SubProgress: 'static,
{
    let crate::handshake::Outcome {
        server_protocol_version: protocol_version,
        refs,
        v1_shallow_updates: _ignored_shallow_updates_as_it_is_deprecated,
        capabilities,
    } = crate::fetch::handshake(
        &mut transport,
        authenticate,
        delegate.handshake_extra_parameters(),
        &mut progress,
    )
    .await?;

    let agent = crate::agent(agent);
    let refs = match refs {
        Some(refs) => refs,
        None => {
            crate::ls_refs(
                &mut transport,
                &capabilities,
                |a, b, c| {
                    let res = delegate.prepare_ls_refs(a, b, c);
                    c.push(("agent", Some(Cow::Owned(agent.clone()))));
                    res
                },
                &mut progress,
                trace,
            )
            .await?
        }
    };

    let fetch = Command::Fetch;
    let mut fetch_features = fetch.default_features(protocol_version, &capabilities);
    match delegate.prepare_fetch(protocol_version, &capabilities, &mut fetch_features, &refs) {
        Ok(Action::Cancel) => {
            return if matches!(protocol_version, gix_transport::Protocol::V1)
                || matches!(fetch_mode, FetchConnection::TerminateOnSuccessfulCompletion)
            {
                indicate_end_of_interaction(transport, trace).await.map_err(Into::into)
            } else {
                Ok(())
            };
        }
        Ok(Action::Continue) => {
            fetch.validate_argument_prefixes_or_panic(protocol_version, &capabilities, &[], &fetch_features);
        }
        Err(err) => {
            indicate_end_of_interaction(transport, trace).await?;
            return Err(err.into());
        }
    }

    Response::check_required_features(protocol_version, &fetch_features)?;
    let sideband_all = fetch_features.iter().any(|(n, _)| *n == "sideband-all");
    fetch_features.push(("agent", Some(Cow::Owned(agent))));
    let mut arguments = Arguments::new(protocol_version, fetch_features, trace);
    let mut previous_response = None::<Response>;
    let mut round = 1;
    'negotiation: loop {
        progress.step();
        progress.set_name(format!("negotiate (round {round})"));
        round += 1;
        let action = delegate.negotiate(&refs, &mut arguments, previous_response.as_ref())?;
        let mut reader = arguments.send(&mut transport, action == Action::Cancel).await?;
        if sideband_all {
            setup_remote_progress(&mut progress, &mut reader);
        }
        let response = Response::from_line_reader(
            protocol_version,
            &mut reader,
            true,  /* hack, telling us we don't want this delegate approach anymore */
            false, /* just as much of a hack which causes us to expect a pack immediately */
        )
        .await?;
        previous_response = if response.has_pack() {
            progress.step();
            progress.set_name("receiving pack".into());
            if !sideband_all {
                setup_remote_progress(&mut progress, &mut reader);
            }
            delegate.receive_pack(reader, progress, &refs, &response).await?;
            break 'negotiation;
        } else {
            match action {
                Action::Cancel => break 'negotiation,
                Action::Continue => Some(response),
            }
        }
    }
    if matches!(protocol_version, gix_transport::Protocol::V2)
        && matches!(fetch_mode, FetchConnection::TerminateOnSuccessfulCompletion)
    {
        indicate_end_of_interaction(transport, trace).await?;
    }
    Ok(())
}

fn setup_remote_progress<P>(
    progress: &mut P,
    reader: &mut Box<dyn gix_transport::client::ExtendedBufRead<'_> + Unpin + '_>,
) where
    P: NestedProgress,
    P::SubProgress: 'static,
{
    reader.set_progress_handler(Some(Box::new({
        let mut remote_progress = progress.add_child("remote");
        move |is_err: bool, data: &[u8]| {
            crate::RemoteProgress::translate_to_progress(is_err, data, &mut remote_progress);
            gix_transport::packetline::read::ProgressAction::Continue
        }
    }) as gix_transport::client::HandleProgress<'_>));
}