alloy_rpc_client/
call.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
use alloy_json_rpc::{
    transform_response, try_deserialize_ok, Request, RequestPacket, ResponsePacket, RpcParam,
    RpcResult, RpcReturn,
};
use alloy_transport::{RpcFut, Transport, TransportError, TransportResult};
use core::panic;
use futures::FutureExt;
use serde_json::value::RawValue;
use std::{
    fmt,
    future::Future,
    marker::PhantomData,
    pin::Pin,
    task::{self, ready, Poll::Ready},
};
use tower::Service;

/// The states of the [`RpcCall`] future.
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[pin_project::pin_project(project = CallStateProj)]
enum CallState<Params, Conn>
where
    Params: RpcParam,
    Conn: Transport + Clone,
{
    Prepared {
        request: Option<Request<Params>>,
        connection: Conn,
    },
    AwaitingResponse {
        #[pin]
        fut: <Conn as Service<RequestPacket>>::Future,
    },
    Complete,
}

impl<Params, Conn> Clone for CallState<Params, Conn>
where
    Params: RpcParam,
    Conn: Transport + Clone,
{
    fn clone(&self) -> Self {
        match self {
            Self::Prepared { request, connection } => {
                Self::Prepared { request: request.clone(), connection: connection.clone() }
            }
            _ => panic!("cloned after dispatch"),
        }
    }
}

impl<Params, Conn> fmt::Debug for CallState<Params, Conn>
where
    Params: RpcParam,
    Conn: Transport + Clone,
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.write_str(match self {
            Self::Prepared { .. } => "Prepared",
            Self::AwaitingResponse { .. } => "AwaitingResponse",
            Self::Complete => "Complete",
        })
    }
}

impl<Params, Conn> Future for CallState<Params, Conn>
where
    Conn: Transport + Clone,
    Params: RpcParam,
{
    type Output = TransportResult<Box<RawValue>>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
        loop {
            match self.as_mut().project() {
                CallStateProj::Prepared { connection, request } => {
                    if let Err(e) =
                        task::ready!(Service::<RequestPacket>::poll_ready(connection, cx))
                    {
                        self.set(Self::Complete);
                        return Ready(RpcResult::Err(e));
                    }

                    let request = request.take().expect("no request");
                    debug!(method=%request.meta.method, id=%request.meta.id, "sending request");
                    trace!(params_ty=%std::any::type_name::<Params>(), ?request, "full request");
                    let request = request.serialize();
                    let fut = match request {
                        Ok(request) => {
                            trace!(request=%request.serialized(), "serialized request");
                            connection.call(request.into())
                        }
                        Err(err) => {
                            trace!(?err, "failed to serialize request");
                            self.set(Self::Complete);
                            return Ready(RpcResult::Err(TransportError::ser_err(err)));
                        }
                    };
                    self.set(Self::AwaitingResponse { fut });
                }
                CallStateProj::AwaitingResponse { fut } => {
                    let res = match task::ready!(fut.poll(cx)) {
                        Ok(ResponsePacket::Single(res)) => Ready(transform_response(res)),
                        Err(e) => Ready(RpcResult::Err(e)),
                        _ => panic!("received batch response from single request"),
                    };
                    self.set(Self::Complete);
                    return res;
                }
                CallStateProj::Complete => {
                    panic!("Polled after completion");
                }
            }
        }
    }
}

/// A prepared, but unsent, RPC call.
///
/// This is a future that will send the request when polled. It contains a
/// [`Request`], a [`Transport`], and knowledge of its expected response
/// type. Upon awaiting, it will send the request and wait for the response. It
/// will then deserialize the response into the expected type.
///
/// Errors are captured in the [`RpcResult`] type. Rpc Calls will result in
/// either a successful response of the `Resp` type, an error response, or a
/// transport error.
///
/// ### Note
///
/// Serializing the request is done lazily. The request is not serialized until
/// the future is polled. This differs from the behavior of
/// [`crate::BatchRequest`], which serializes greedily. This is because the
/// batch request must immediately erase the `Param` type to allow batching of
/// requests with different `Param` types, while the `RpcCall` may do so lazily.
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[pin_project::pin_project]
#[derive(Clone)]
pub struct RpcCall<Conn, Params, Resp, Output = Resp, Map = fn(Resp) -> Output>
where
    Conn: Transport + Clone,
    Params: RpcParam,
    Map: FnOnce(Resp) -> Output,
{
    #[pin]
    state: CallState<Params, Conn>,
    map: Option<Map>,
    _pd: core::marker::PhantomData<fn() -> (Resp, Output)>,
}

impl<Conn, Params, Resp, Output, Map> core::fmt::Debug for RpcCall<Conn, Params, Resp, Output, Map>
where
    Conn: Transport + Clone,
    Params: RpcParam,
    Map: FnOnce(Resp) -> Output,
{
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
        f.debug_struct("RpcCall").field("state", &self.state).finish()
    }
}

impl<Conn, Params, Resp> RpcCall<Conn, Params, Resp>
where
    Conn: Transport + Clone,
    Params: RpcParam,
{
    #[doc(hidden)]
    pub fn new(req: Request<Params>, connection: Conn) -> Self {
        Self {
            state: CallState::Prepared { request: Some(req), connection },
            map: Some(std::convert::identity),
            _pd: PhantomData,
        }
    }
}

impl<Conn, Params, Resp, Output, Map> RpcCall<Conn, Params, Resp, Output, Map>
where
    Conn: Transport + Clone,
    Params: RpcParam,
    Map: FnOnce(Resp) -> Output,
{
    /// Map the response to a different type. This is usable for converting
    /// the response to a more usable type, e.g. changing `U64` to `u64`.
    ///
    /// ## Note
    ///
    /// Carefully review the rust documentation on [fn pointers] before passing
    /// them to this function. Unless the pointer is specifically coerced to a
    /// `fn(_) -> _`, the `NewMap` will be inferred as that function's unique
    /// type. This can lead to confusing error messages.
    ///
    /// [fn pointers]: https://doc.rust-lang.org/std/primitive.fn.html#creating-function-pointers
    pub fn map_resp<NewOutput, NewMap>(
        self,
        map: NewMap,
    ) -> RpcCall<Conn, Params, Resp, NewOutput, NewMap>
    where
        NewMap: FnOnce(Resp) -> NewOutput,
    {
        RpcCall { state: self.state, map: Some(map), _pd: PhantomData }
    }

    /// Returns `true` if the request is a subscription.
    ///
    /// # Panics
    ///
    /// Panics if called after the request has been sent.
    pub fn is_subscription(&self) -> bool {
        self.request().meta.is_subscription()
    }

    /// Set the request to be a non-standard subscription (i.e. not
    /// "eth_subscribe").
    ///
    /// # Panics
    ///
    /// Panics if called after the request has been sent.
    pub fn set_is_subscription(&mut self) {
        self.request_mut().meta.set_is_subscription();
    }

    /// Set the subscription status of the request.
    pub fn set_subscription_status(&mut self, status: bool) {
        self.request_mut().meta.set_subscription_status(status);
    }

    /// Get a mutable reference to the params of the request.
    ///
    /// This is useful for modifying the params after the request has been
    /// prepared.
    ///
    /// # Panics
    ///
    /// Panics if called after the request has been sent.
    pub fn params(&mut self) -> &mut Params {
        &mut self.request_mut().params
    }

    /// Returns a reference to the request.
    ///
    /// # Panics
    ///
    /// Panics if called after the request has been sent.
    pub fn request(&self) -> &Request<Params> {
        let CallState::Prepared { request, .. } = &self.state else {
            panic!("Cannot get request after request has been sent");
        };
        request.as_ref().expect("no request in prepared")
    }

    /// Returns a mutable reference to the request.
    ///
    /// # Panics
    ///
    /// Panics if called after the request has been sent.
    pub fn request_mut(&mut self) -> &mut Request<Params> {
        let CallState::Prepared { request, .. } = &mut self.state else {
            panic!("Cannot get request after request has been sent");
        };
        request.as_mut().expect("no request in prepared")
    }

    /// Map the params of the request into a new type.
    pub fn map_params<NewParams: RpcParam>(
        self,
        map: impl Fn(Params) -> NewParams,
    ) -> RpcCall<Conn, NewParams, Resp, Output, Map> {
        let CallState::Prepared { request, connection } = self.state else {
            panic!("Cannot get request after request has been sent");
        };
        let request = request.expect("no request in prepared").map_params(map);
        RpcCall {
            state: CallState::Prepared { request: Some(request), connection },
            map: self.map,
            _pd: PhantomData,
        }
    }
}

impl<Conn, Params, Resp, Output, Map> RpcCall<Conn, &Params, Resp, Output, Map>
where
    Conn: Transport + Clone,
    Params: RpcParam + ToOwned,
    Params::Owned: RpcParam,
    Map: FnOnce(Resp) -> Output,
{
    /// Convert this call into one with owned params, by cloning the params.
    ///
    /// # Panics
    ///
    /// Panics if called after the request has been polled.
    pub fn into_owned_params(self) -> RpcCall<Conn, Params::Owned, Resp, Output, Map> {
        let CallState::Prepared { request, connection } = self.state else {
            panic!("Cannot get params after request has been sent");
        };
        let request = request.expect("no request in prepared").into_owned_params();

        RpcCall {
            state: CallState::Prepared { request: Some(request), connection },
            map: self.map,
            _pd: PhantomData,
        }
    }
}

impl<'a, Conn, Params, Resp, Output, Map> RpcCall<Conn, Params, Resp, Output, Map>
where
    Conn: Transport + Clone,
    Params: RpcParam + 'a,
    Resp: RpcReturn,
    Output: 'static,
    Map: FnOnce(Resp) -> Output + Send + 'a,
{
    /// Convert this future into a boxed, pinned future, erasing its type.
    pub fn boxed(self) -> RpcFut<'a, Output> {
        Box::pin(self)
    }
}

impl<Conn, Params, Resp, Output, Map> Future for RpcCall<Conn, Params, Resp, Output, Map>
where
    Conn: Transport + Clone,
    Params: RpcParam,
    Resp: RpcReturn,
    Output: 'static,
    Map: FnOnce(Resp) -> Output,
{
    type Output = TransportResult<Output>;

    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
        trace!(?self.state, "polling RpcCall");

        let this = self.get_mut();
        let resp = try_deserialize_ok(ready!(this.state.poll_unpin(cx)));

        Ready(resp.map(this.map.take().expect("polled after completion")))
    }
}