ic_web3_rs/transports/
batch.rs

1//! Batching Transport
2
3use crate::{
4    error::{self, Error},
5    rpc, BatchTransport, RequestId, Transport,
6};
7use futures::{
8    channel::oneshot,
9    task::{Context, Poll},
10    Future, FutureExt,
11};
12use ic_cdk::api::management_canister::http_request::TransformContext;
13use parking_lot::Mutex;
14use std::{collections::BTreeMap, pin::Pin, sync::Arc};
15
16use super::ic_http_client::CallOptions;
17
18type Pending = oneshot::Sender<error::Result<rpc::Value>>;
19type PendingRequests = Arc<Mutex<BTreeMap<RequestId, Pending>>>;
20
21/// Transport allowing to batch queries together.
22#[derive(Debug, Clone)]
23pub struct Batch<T> {
24    transport: T,
25    pending: PendingRequests,
26    batch: Arc<Mutex<Vec<(RequestId, rpc::Call)>>>,
27}
28
29impl<T> Batch<T>
30where
31    T: BatchTransport,
32{
33    /// Creates new Batch transport given existing transport supporing batch requests.
34    pub fn new(transport: T) -> Self {
35        Batch {
36            transport,
37            pending: Default::default(),
38            batch: Default::default(),
39        }
40    }
41
42    /// Sends all requests as a batch.
43    pub fn submit_batch(&self) -> impl Future<Output = error::Result<Vec<error::Result<rpc::Value>>>> {
44        let batch = std::mem::take(&mut *self.batch.lock());
45        let ids = batch.iter().map(|&(id, _)| id).collect::<Vec<_>>();
46
47        let batch = self.transport.send_batch(batch);
48        let pending = self.pending.clone();
49
50        async move {
51            let res = batch.await;
52            let mut pending = pending.lock();
53            for (idx, request_id) in ids.into_iter().enumerate() {
54                if let Some(rx) = pending.remove(&request_id) {
55                    // Ignore sending error
56                    let _ = match res {
57                        Ok(ref results) if results.len() > idx => rx.send(results[idx].clone()),
58                        Err(ref err) => rx.send(Err(err.clone())),
59                        _ => rx.send(Err(Error::Internal)),
60                    };
61                }
62            }
63            res
64        }
65    }
66}
67
68impl<T> Transport for Batch<T>
69where
70    T: BatchTransport,
71{
72    type Out = SingleResult;
73
74    fn prepare(&self, method: &str, params: Vec<rpc::Value>) -> (RequestId, rpc::Call) {
75        self.transport.prepare(method, params)
76    }
77
78    fn send(&self, id: RequestId, request: rpc::Call, options: CallOptions) -> Self::Out {
79        let (tx, rx) = oneshot::channel();
80        self.pending.lock().insert(id, tx);
81        self.batch.lock().push((id, request));
82
83        SingleResult(rx)
84    }
85
86    fn set_max_response_bytes(&mut self, v: u64) {
87        self.transport.set_max_response_bytes(v);
88    }
89}
90
91/// Result of calling a single method that will be part of the batch.
92/// Converts `oneshot::Receiver` error into `Error::Internal`
93pub struct SingleResult(oneshot::Receiver<error::Result<rpc::Value>>);
94
95impl Future for SingleResult {
96    type Output = error::Result<rpc::Value>;
97
98    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
99        Poll::Ready(ready!(self.0.poll_unpin(ctx)).map_err(|_| Error::Internal)?)
100    }
101}