multistream_select/dialer_select.rs
1// Copyright 2017 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Protocol negotiation strategies for the peer acting as the dialer.
22
23use crate::protocol::{HeaderLine, Message, MessageIO, Protocol, ProtocolError};
24use crate::{Negotiated, NegotiationError, Version};
25
26use futures::prelude::*;
27use std::{
28 convert::TryFrom as _,
29 iter, mem,
30 pin::Pin,
31 task::{Context, Poll},
32};
33
34/// Returns a `Future` that negotiates a protocol on the given I/O stream
35/// for a peer acting as the _dialer_ (or _initiator_).
36///
37/// This function is given an I/O stream and a list of protocols and returns a
38/// computation that performs the protocol negotiation with the remote. The
39/// returned `Future` resolves with the name of the negotiated protocol and
40/// a [`Negotiated`] I/O stream.
41///
42/// Within the scope of this library, a dialer always commits to a specific
43/// multistream-select [`Version`], whereas a listener always supports
44/// all versions supported by this library. Frictionless multistream-select
45/// protocol upgrades may thus proceed by deployments with updated listeners,
46/// eventually followed by deployments of dialers choosing the newer protocol.
47pub fn dialer_select_proto<R, I>(
48 inner: R,
49 protocols: I,
50 version: Version,
51) -> DialerSelectFuture<R, I::IntoIter>
52where
53 R: AsyncRead + AsyncWrite,
54 I: IntoIterator,
55 I::Item: AsRef<str>,
56{
57 let protocols = protocols.into_iter().peekable();
58 DialerSelectFuture {
59 version,
60 protocols,
61 state: State::SendHeader {
62 io: MessageIO::new(inner),
63 },
64 }
65}
66
67/// A `Future` returned by [`dialer_select_proto`] which negotiates
68/// a protocol iteratively by considering one protocol after the other.
69#[pin_project::pin_project]
70pub struct DialerSelectFuture<R, I: Iterator> {
71 // TODO: It would be nice if eventually N = I::Item = Protocol.
72 protocols: iter::Peekable<I>,
73 state: State<R, I::Item>,
74 version: Version,
75}
76
77enum State<R, N> {
78 SendHeader { io: MessageIO<R> },
79 SendProtocol { io: MessageIO<R>, protocol: N },
80 FlushProtocol { io: MessageIO<R>, protocol: N },
81 AwaitProtocol { io: MessageIO<R>, protocol: N },
82 Done,
83}
84
85impl<R, I> Future for DialerSelectFuture<R, I>
86where
87 // The Unpin bound here is required because we produce a `Negotiated<R>` as the output.
88 // It also makes the implementation considerably easier to write.
89 R: AsyncRead + AsyncWrite + Unpin,
90 I: Iterator,
91 I::Item: AsRef<str>,
92{
93 type Output = Result<(I::Item, Negotiated<R>), NegotiationError>;
94
95 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
96 let this = self.project();
97
98 loop {
99 match mem::replace(this.state, State::Done) {
100 State::SendHeader { mut io } => {
101 match Pin::new(&mut io).poll_ready(cx)? {
102 Poll::Ready(()) => {}
103 Poll::Pending => {
104 *this.state = State::SendHeader { io };
105 return Poll::Pending;
106 }
107 }
108
109 let h = HeaderLine::from(*this.version);
110 if let Err(err) = Pin::new(&mut io).start_send(Message::Header(h)) {
111 return Poll::Ready(Err(From::from(err)));
112 }
113
114 let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?;
115
116 // The dialer always sends the header and the first protocol
117 // proposal in one go for efficiency.
118 *this.state = State::SendProtocol { io, protocol };
119 }
120
121 State::SendProtocol { mut io, protocol } => {
122 match Pin::new(&mut io).poll_ready(cx)? {
123 Poll::Ready(()) => {}
124 Poll::Pending => {
125 *this.state = State::SendProtocol { io, protocol };
126 return Poll::Pending;
127 }
128 }
129
130 let p = Protocol::try_from(protocol.as_ref())?;
131 if let Err(err) = Pin::new(&mut io).start_send(Message::Protocol(p.clone())) {
132 return Poll::Ready(Err(From::from(err)));
133 }
134 log::debug!("Dialer: Proposed protocol: {}", p);
135
136 if this.protocols.peek().is_some() {
137 *this.state = State::FlushProtocol { io, protocol }
138 } else {
139 match this.version {
140 Version::V1 => *this.state = State::FlushProtocol { io, protocol },
141 // This is the only effect that `V1Lazy` has compared to `V1`:
142 // Optimistically settling on the only protocol that
143 // the dialer supports for this negotiation. Notably,
144 // the dialer expects a regular `V1` response.
145 Version::V1Lazy => {
146 log::debug!("Dialer: Expecting proposed protocol: {}", p);
147 let hl = HeaderLine::from(Version::V1Lazy);
148 let io = Negotiated::expecting(io.into_reader(), p, Some(hl));
149 return Poll::Ready(Ok((protocol, io)));
150 }
151 }
152 }
153 }
154
155 State::FlushProtocol { mut io, protocol } => {
156 match Pin::new(&mut io).poll_flush(cx)? {
157 Poll::Ready(()) => *this.state = State::AwaitProtocol { io, protocol },
158 Poll::Pending => {
159 *this.state = State::FlushProtocol { io, protocol };
160 return Poll::Pending;
161 }
162 }
163 }
164
165 State::AwaitProtocol { mut io, protocol } => {
166 let msg = match Pin::new(&mut io).poll_next(cx)? {
167 Poll::Ready(Some(msg)) => msg,
168 Poll::Pending => {
169 *this.state = State::AwaitProtocol { io, protocol };
170 return Poll::Pending;
171 }
172 // Treat EOF error as [`NegotiationError::Failed`], not as
173 // [`NegotiationError::ProtocolError`], allowing dropping or closing an I/O
174 // stream as a permissible way to "gracefully" fail a negotiation.
175 Poll::Ready(None) => return Poll::Ready(Err(NegotiationError::Failed)),
176 };
177
178 match msg {
179 Message::Header(v) if v == HeaderLine::from(*this.version) => {
180 *this.state = State::AwaitProtocol { io, protocol };
181 }
182 Message::Protocol(ref p) if p.as_ref() == protocol.as_ref() => {
183 log::debug!("Dialer: Received confirmation for protocol: {}", p);
184 let io = Negotiated::completed(io.into_inner());
185 return Poll::Ready(Ok((protocol, io)));
186 }
187 Message::NotAvailable => {
188 log::debug!(
189 "Dialer: Received rejection of protocol: {}",
190 protocol.as_ref()
191 );
192 let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?;
193 *this.state = State::SendProtocol { io, protocol }
194 }
195 _ => return Poll::Ready(Err(ProtocolError::InvalidMessage.into())),
196 }
197 }
198
199 State::Done => panic!("State::poll called after completion"),
200 }
201 }
202 }
203}