alloy_provider/provider/
prov_call.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
use alloy_json_rpc::{RpcParam, RpcReturn};
use alloy_rpc_client::{RpcCall, Waiter};
use alloy_transport::{Transport, TransportResult};
use futures::FutureExt;
use pin_project::pin_project;
use serde_json::value::RawValue;
use std::{
    future::Future,
    pin::Pin,
    task::{self, Poll},
};
use tokio::sync::oneshot;

/// The primary future type for the [`Provider`].
///
/// This future abstracts over several potential data sources. It allows
/// providers to:
/// - produce data via an [`RpcCall`]
/// - produce data by waiting on a batched RPC [`Waiter`]
/// - proudce data via an arbitrary boxed future
/// - produce data in any synchronous way
///
/// [`Provider`]: crate::Provider
#[pin_project(project = ProviderCallProj)]
pub enum ProviderCall<Conn, Params, Resp, Output = Resp, Map = fn(Resp) -> Output>
where
    Conn: Transport + Clone,
    Params: RpcParam,
    Resp: RpcReturn,
    Map: Fn(Resp) -> Output,
{
    /// An underlying call to an RPC server.
    RpcCall(RpcCall<Conn, Params, Resp, Output, Map>),
    /// A waiter for a batched call to a remote RPC server.
    Waiter(Waiter<Resp, Output, Map>),
    /// A boxed future.
    BoxedFuture(Pin<Box<dyn Future<Output = TransportResult<Output>> + Send>>),
    /// The output, produces synchronously.
    Ready(Option<TransportResult<Output>>),
}

impl<Conn, Params, Resp, Output, Map> ProviderCall<Conn, Params, Resp, Output, Map>
where
    Conn: Transport + Clone,
    Params: RpcParam,
    Resp: RpcReturn,
    Map: Fn(Resp) -> Output,
{
    /// Instantiate a new [`ProviderCall`] from the output.
    pub const fn ready(output: TransportResult<Output>) -> Self {
        Self::Ready(Some(output))
    }

    /// True if this is an RPC call.
    pub const fn is_rpc_call(&self) -> bool {
        matches!(self, Self::RpcCall(_))
    }

    /// Fallible cast to [`RpcCall`]
    pub const fn as_rpc_call(&self) -> Option<&RpcCall<Conn, Params, Resp, Output, Map>> {
        match self {
            Self::RpcCall(call) => Some(call),
            _ => None,
        }
    }

    /// Fallible cast to mutable [`RpcCall`]
    pub fn as_mut_rpc_call(&mut self) -> Option<&mut RpcCall<Conn, Params, Resp, Output, Map>> {
        match self {
            Self::RpcCall(call) => Some(call),
            _ => None,
        }
    }

    /// True if this is a waiter.
    pub const fn is_waiter(&self) -> bool {
        matches!(self, Self::Waiter(_))
    }

    /// Fallible cast to [`Waiter`]
    pub const fn as_waiter(&self) -> Option<&Waiter<Resp, Output, Map>> {
        match self {
            Self::Waiter(waiter) => Some(waiter),
            _ => None,
        }
    }

    /// Fallible cast to mutable [`Waiter`]
    pub fn as_mut_waiter(&mut self) -> Option<&mut Waiter<Resp, Output, Map>> {
        match self {
            Self::Waiter(waiter) => Some(waiter),
            _ => None,
        }
    }

    /// True if this is a boxed future.
    pub const fn is_boxed_future(&self) -> bool {
        matches!(self, Self::BoxedFuture(_))
    }

    /// Fallible cast to a boxed future.
    pub const fn as_boxed_future(
        &self,
    ) -> Option<&Pin<Box<dyn Future<Output = TransportResult<Output>> + Send>>> {
        match self {
            Self::BoxedFuture(fut) => Some(fut),
            _ => None,
        }
    }

    /// True if this is a ready value.
    pub const fn is_ready(&self) -> bool {
        matches!(self, Self::Ready(_))
    }

    /// Fallible cast to a ready value.
    ///
    /// # Panics
    ///
    /// Panics if the future is already complete
    pub const fn as_ready(&self) -> Option<&TransportResult<Output>> {
        match self {
            Self::Ready(Some(output)) => Some(output),
            Self::Ready(None) => panic!("tried to access ready value after taking"),
            _ => None,
        }
    }

    /// Set a function to map the response into a different type. This is
    /// useful for transforming the response into a more usable type, e.g.
    /// changing `U64` to `u64`.
    ///
    /// This function fails if the inner future is not an [`RpcCall`] or
    /// [`Waiter`].
    ///
    /// ## Note
    ///
    /// Carefully review the rust documentation on [fn pointers] before passing
    /// them to this function. Unless the pointer is specifically coerced to a
    /// `fn(_) -> _`, the `NewMap` will be inferred as that function's unique
    /// type. This can lead to confusing error messages.
    ///
    /// [fn pointers]: https://doc.rust-lang.org/std/primitive.fn.html#creating-function-pointers
    pub fn map_resp<NewOutput, NewMap>(
        self,
        map: NewMap,
    ) -> Result<ProviderCall<Conn, Params, Resp, NewOutput, NewMap>, Self>
    where
        NewMap: Fn(Resp) -> NewOutput + Clone,
    {
        match self {
            Self::RpcCall(call) => Ok(ProviderCall::RpcCall(call.map_resp(map))),
            Self::Waiter(waiter) => Ok(ProviderCall::Waiter(waiter.map_resp(map))),
            _ => Err(self),
        }
    }
}

impl<Conn, Params, Resp, Output, Map> ProviderCall<Conn, &Params, Resp, Output, Map>
where
    Conn: Transport + Clone,
    Params: RpcParam + ToOwned,
    Params::Owned: RpcParam,
    Resp: RpcReturn,
    Map: Fn(Resp) -> Output,
{
    /// Convert this call into one with owned params, by cloning the params.
    ///
    /// # Panics
    ///
    /// Panics if called after the request has been polled.
    pub fn into_owned_params(self) -> ProviderCall<Conn, Params::Owned, Resp, Output, Map> {
        match self {
            Self::RpcCall(call) => ProviderCall::RpcCall(call.into_owned_params()),
            _ => panic!(),
        }
    }
}

impl<Conn, Params, Resp> std::fmt::Debug for ProviderCall<Conn, Params, Resp>
where
    Conn: Transport + Clone,
    Params: RpcParam,
    Resp: RpcReturn,
{
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::RpcCall(call) => f.debug_tuple("RpcCall").field(call).finish(),
            Self::Waiter { .. } => f.debug_struct("Waiter").finish_non_exhaustive(),
            Self::BoxedFuture(_) => f.debug_struct("BoxedFuture").finish_non_exhaustive(),
            Self::Ready(_) => f.debug_struct("Ready").finish_non_exhaustive(),
        }
    }
}

impl<Conn, Params, Resp, Output, Map> From<RpcCall<Conn, Params, Resp, Output, Map>>
    for ProviderCall<Conn, Params, Resp, Output, Map>
where
    Conn: Transport + Clone,
    Params: RpcParam,
    Resp: RpcReturn,
    Map: Fn(Resp) -> Output,
{
    fn from(call: RpcCall<Conn, Params, Resp, Output, Map>) -> Self {
        Self::RpcCall(call)
    }
}

impl<Conn, Params, Resp> From<Waiter<Resp>>
    for ProviderCall<Conn, Params, Resp, Resp, fn(Resp) -> Resp>
where
    Conn: Transport + Clone,
    Params: RpcParam,
    Resp: RpcReturn,
{
    fn from(waiter: Waiter<Resp>) -> Self {
        Self::Waiter(waiter)
    }
}

impl<Conn, Params, Resp, Output, Map>
    From<Pin<Box<dyn Future<Output = TransportResult<Output>> + Send>>>
    for ProviderCall<Conn, Params, Resp, Output, Map>
where
    Conn: Transport + Clone,
    Params: RpcParam,
    Resp: RpcReturn,
    Map: Fn(Resp) -> Output,
{
    fn from(fut: Pin<Box<dyn Future<Output = TransportResult<Output>> + Send>>) -> Self {
        Self::BoxedFuture(fut)
    }
}

impl<Conn, Params, Resp> From<oneshot::Receiver<TransportResult<Box<RawValue>>>>
    for ProviderCall<Conn, Params, Resp>
where
    Conn: Transport + Clone,
    Params: RpcParam,
    Resp: RpcReturn,
{
    fn from(rx: oneshot::Receiver<TransportResult<Box<RawValue>>>) -> Self {
        Waiter::from(rx).into()
    }
}

impl<Conn, Params, Resp, Output, Map> Future for ProviderCall<Conn, Params, Resp, Output, Map>
where
    Conn: Transport + Clone,
    Params: RpcParam,
    Resp: RpcReturn,
    Output: 'static,
    Map: Fn(Resp) -> Output,
{
    type Output = TransportResult<Output>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
        match self.as_mut().project() {
            ProviderCallProj::RpcCall(call) => call.poll_unpin(cx),
            ProviderCallProj::Waiter(waiter) => waiter.poll_unpin(cx),
            ProviderCallProj::BoxedFuture(fut) => fut.poll_unpin(cx),
            ProviderCallProj::Ready(output) => {
                Poll::Ready(output.take().expect("output taken twice"))
            }
        }
    }
}